Callback method invoked when a DATA frame has been demanded.
diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/ContentDemander_state.puml b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/ContentDemander_state.puml new file mode 100644 index 00000000000..f9ee5b4af5a --- /dev/null +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/ContentDemander_state.puml @@ -0,0 +1,26 @@ +@startuml + +null: +content: +DEMANDING: +EOF: + +[*] --> null + +null --> DEMANDING : demand() +null --> EOF : eof() +null -left-> null : onTimeout() + +DEMANDING --> DEMANDING : demand() +DEMANDING --> content : onContent()\n onTimeout() +DEMANDING --> EOF : eof() + +EOF --> EOF : eof()\n onTimeout() + +note bottom of content: content1 -> content2 is only\nvalid if content1 is special +note top of content: content -> null only happens\nwhen content is not special +content --> content : onContent()\n onTimeout() +content --> null: take() +content --> EOF: eof() + +@enduml diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnectionFactory.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnectionFactory.java index d9c49d60399..6b9202803ab 100644 --- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnectionFactory.java +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnectionFactory.java @@ -157,7 +157,7 @@ public class HTTP2ServerConnectionFactory extends AbstractHTTP2ServerConnectionF } @Override - public void onData(Stream stream, DataFrame frame, Callback callback) + public void onDataDemanded(Stream stream, DataFrame frame, Callback callback) { getConnection().onData((IStream)stream, frame, callback); } diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpChannelOverHTTP2.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpChannelOverHTTP2.java index a54e6769c55..f3b8100042c 100644 --- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpChannelOverHTTP2.java +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpChannelOverHTTP2.java @@ -21,6 +21,7 @@ package org.eclipse.jetty.http2.server; import java.io.Closeable; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import org.eclipse.jetty.http.BadMessageException; @@ -57,10 +58,12 @@ public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable, Writ private boolean _expect100Continue; private boolean _delayedUntilContent; private boolean _useOutputDirectByteBuffers; + private final ContentDemander _contentDemander; public HttpChannelOverHTTP2(Connector connector, HttpConfiguration configuration, EndPoint endPoint, HttpTransportOverHTTP2 transport) { super(connector, configuration, endPoint, transport); + _contentDemander = new ContentDemander(); } protected IStream getStream() @@ -131,9 +134,18 @@ public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable, Writ _delayedUntilContent = getHttpConfiguration().isDelayDispatchUntilContent() && !endStream && !_expect100Continue && !connect; - // Delay the demand of DATA frames for CONNECT with :protocol. - if (!connect || request.getProtocol() == null) - getStream().demand(1); + // Delay the demand of DATA frames for CONNECT with :protocol + // or for normal requests expecting 100 continue. + if (connect) + { + if (request.getProtocol() == null) + _contentDemander.demand(false); + } + else + { + if (_delayedUntilContent) + _contentDemander.demand(false); + } if (LOG.isDebugEnabled()) { @@ -204,6 +216,7 @@ public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable, Writ { _expect100Continue = false; _delayedUntilContent = false; + _contentDemander.recycle(); super.recycle(); getHttpTransport().recycle(); } @@ -224,26 +237,16 @@ public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable, Writ @Override public Runnable onData(DataFrame frame, Callback callback) { - return onRequestContent(frame, callback); - } - - public Runnable onRequestContent(DataFrame frame, final Callback callback) - { - Stream stream = getStream(); - if (stream.isReset()) - { - // Consume previously queued content to - // enlarge the session flow control window. - consumeInput(); - // Consume immediately this content. - callback.succeeded(); - return null; - } - ByteBuffer buffer = frame.getData(); int length = buffer.remaining(); - boolean handle = onContent(new HttpInput.Content(buffer) + HttpInput.Content content = new HttpInput.Content(buffer) { + @Override + public boolean isEof() + { + return frame.isEndStream(); + } + @Override public void succeeded() { @@ -261,23 +264,31 @@ public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable, Writ { return callback.getInvocationType(); } - }); + }; + boolean needed = _contentDemander.onContent(content); + boolean handle = onContent(content); boolean endStream = frame.isEndStream(); if (endStream) { boolean handleContent = onContentComplete(); + // This will generate EOF -> must happen before onContentProducible. boolean handleRequest = onRequestComplete(); handle |= handleContent | handleRequest; } + boolean woken = needed && getRequest().getHttpInput().onContentProducible(); + handle |= woken; if (LOG.isDebugEnabled()) { - LOG.debug("HTTP2 Request #{}/{}: {} bytes of {} content, handle: {}", + Stream stream = getStream(); + LOG.debug("HTTP2 Request #{}/{}: {} bytes of {} content, woken: {}, needed: {}, handle: {}", stream.getId(), Integer.toHexString(stream.getSession().hashCode()), length, endStream ? "last" : "some", + woken, + needed, handle); } @@ -286,6 +297,326 @@ public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable, Writ return handle || wasDelayed ? this : null; } + /** + * Demanding content is a marker content that is used to remember that a demand was + * registered into the stream. The {@code needed} flag indicates if the demand originated + * from a call to {@link #produceContent()} when false or {@link #needContent()} + * when true, as {@link HttpInput#onContentProducible()} must only be called + * only when {@link #needContent()} was called. + * Instances of this class must never escape the scope of this channel impl, + * so {@link #produceContent()} must never return one. + */ + private static final class DemandingContent extends HttpInput.SpecialContent + { + private final boolean needed; + + private DemandingContent(boolean needed) + { + this.needed = needed; + } + } + + private static final HttpInput.Content EOF = new HttpInput.EofContent(); + private static final HttpInput.Content DEMANDING_NEEDED = new DemandingContent(true); + private static final HttpInput.Content DEMANDING_NOT_NEEDED = new DemandingContent(false); + + private class ContentDemander + { + private final AtomicReferenceAttempts to perform an HTTP/1.1 upgrade.
*The upgrade looks up a {@link ConnectionFactory.Upgrading} from the connector @@ -534,13 +664,24 @@ public class HttpChannelOverHttp extends HttpChannel implements HttpParser.Reque if (_delayedForContent) { _delayedForContent = false; - getRequest().getHttpInput().onIdleTimeout(timeout); + doOnIdleTimeout(timeout); execute(this); return false; } return true; } + private void doOnIdleTimeout(Throwable x) + { + boolean neverDispatched = getState().isIdle(); + boolean waitingForContent = _content == null || _content.remaining() == 0; + if ((waitingForContent || neverDispatched) && (_content == null || !_content.isSpecial())) + { + x.addSuppressed(new Throwable("HttpInput idle timeout")); + _content = new HttpInput.ErrorContent(x); + } + } + private static class RequestBuilder { private final HttpFields.Mutable _fieldsBuilder = HttpFields.build(); diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java index 3474a9dc271..c08291fbe07 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java @@ -107,12 +107,9 @@ public class HttpChannelState */ private enum InputState { - IDLE, // No isReady; No data - REGISTER, // isReady()==false handling; No data - REGISTERED, // isReady()==false !handling; No data - POSSIBLE, // isReady()==false async read callback called (http/1 only) - PRODUCING, // isReady()==false READ_PRODUCE action is being handled (http/1 only) - READY // isReady() was false, onContentAdded has been called + IDLE, // No isReady; No data + UNREADY, // isReady()==false; No data + READY // isReady() was false; data is available } /* @@ -137,8 +134,6 @@ public class HttpChannelState ASYNC_ERROR, // handle an async error ASYNC_TIMEOUT, // call asyncContext onTimeout WRITE_CALLBACK, // handle an IO write callback - READ_REGISTER, // Register for fill interest - READ_PRODUCE, // Check is a read is possible by parsing/filling READ_CALLBACK, // handle an IO read callback COMPLETE, // Complete the response by closing output TERMINATED, // No further actions @@ -465,19 +460,12 @@ public class HttpChannelState case ASYNC: switch (_inputState) { - case POSSIBLE: - _inputState = InputState.PRODUCING; - return Action.READ_PRODUCE; + case IDLE: + case UNREADY: + break; case READY: _inputState = InputState.IDLE; return Action.READ_CALLBACK; - case REGISTER: - case PRODUCING: - _inputState = InputState.REGISTERED; - return Action.READ_REGISTER; - case IDLE: - case REGISTERED: - break; default: throw new IllegalStateException(getStatusStringLocked()); @@ -1222,99 +1210,8 @@ public class HttpChannelState _channel.getRequest().setAttribute(name, attribute); } - /** - * Called to signal async read isReady() has returned false. - * This indicates that there is no content available to be consumed - * and that once the channel enters the ASYNC_WAIT state it will - * register for read interest by calling {@link HttpChannel#onAsyncWaitForContent()} - * either from this method or from a subsequent call to {@link #unhandle()}. - */ - public void onReadUnready() - { - boolean interested = false; - try (AutoLock l = lock()) - { - if (LOG.isDebugEnabled()) - LOG.debug("onReadUnready {}", toStringLocked()); - - switch (_inputState) - { - case IDLE: - case READY: - if (_state == State.WAITING) - { - interested = true; - _inputState = InputState.REGISTERED; - } - else - { - _inputState = InputState.REGISTER; - } - break; - - case REGISTER: - case REGISTERED: - case POSSIBLE: - case PRODUCING: - break; - - default: - throw new IllegalStateException(toStringLocked()); - } - } - - if (interested) - _channel.onAsyncWaitForContent(); - } - - /** - * Called to signal that content is now available to read. - * If the channel is in ASYNC_WAIT state and unready (ie isReady() has - * returned false), then the state is changed to ASYNC_WOKEN and true - * is returned. - * - * @return True IFF the channel was unready and in ASYNC_WAIT state - */ - public boolean onContentAdded() - { - boolean woken = false; - try (AutoLock l = lock()) - { - if (LOG.isDebugEnabled()) - LOG.debug("onContentAdded {}", toStringLocked()); - - switch (_inputState) - { - case IDLE: - case READY: - break; - - case PRODUCING: - _inputState = InputState.READY; - break; - - case REGISTER: - case REGISTERED: - _inputState = InputState.READY; - if (_state == State.WAITING) - { - woken = true; - _state = State.WOKEN; - } - break; - - default: - throw new IllegalStateException(toStringLocked()); - } - } - return woken; - } - /** * Called to signal that the channel is ready for a callback. - * This is similar to calling {@link #onReadUnready()} followed by - * {@link #onContentAdded()}, except that as content is already - * available, read interest is never set. * * @return true if woken */ @@ -1328,7 +1225,11 @@ public class HttpChannelState switch (_inputState) { + case READY: + _inputState = InputState.READY; + break; case IDLE: + case UNREADY: _inputState = InputState.READY; if (_state == State.WAITING) { @@ -1344,25 +1245,20 @@ public class HttpChannelState return woken; } - /** - * Called to indicate that more content may be available, - * but that a handling thread may need to produce (fill/parse) - * it. Typically called by the async read success callback. - * - * @return {@code true} if more content may be available - */ - public boolean onReadPossible() + public boolean onReadEof() { boolean woken = false; try (AutoLock l = lock()) { if (LOG.isDebugEnabled()) - LOG.debug("onReadPossible {}", toStringLocked()); + LOG.debug("onReadEof {}", toStringLocked()); switch (_inputState) { - case REGISTERED: - _inputState = InputState.POSSIBLE; + case IDLE: + case READY: + case UNREADY: + _inputState = InputState.READY; if (_state == State.WAITING) { woken = true; @@ -1377,29 +1273,72 @@ public class HttpChannelState return woken; } - /** - * Called to signal that a read has read -1. - * Will wake if the read was called while in ASYNC_WAIT state - * - * @return {@code true} if woken - */ - public boolean onReadEof() + public void onContentAdded() { - boolean woken = false; try (AutoLock l = lock()) { if (LOG.isDebugEnabled()) - LOG.debug("onEof {}", toStringLocked()); + LOG.debug("onContentAdded {}", toStringLocked()); - // Force read ready so onAllDataRead can be called - _inputState = InputState.READY; - if (_state == State.WAITING) + switch (_inputState) { - woken = true; - _state = State.WOKEN; + case IDLE: + case UNREADY: + case READY: + _inputState = InputState.READY; + break; + + default: + throw new IllegalStateException(toStringLocked()); + } + } + } + + public void onReadIdle() + { + try (AutoLock l = lock()) + { + if (LOG.isDebugEnabled()) + LOG.debug("onReadIdle {}", toStringLocked()); + + switch (_inputState) + { + case UNREADY: + case READY: + case IDLE: + _inputState = InputState.IDLE; + break; + + default: + throw new IllegalStateException(toStringLocked()); + } + } + } + + /** + * Called to indicate that more content may be available, + * but that a handling thread may need to produce (fill/parse) + * it. Typically called by the async read success callback. + */ + public void onReadUnready() + { + try (AutoLock l = lock()) + { + if (LOG.isDebugEnabled()) + LOG.debug("onReadUnready {}", toStringLocked()); + + switch (_inputState) + { + case IDLE: + case UNREADY: + case READY: // READY->UNREADY is needed by AsyncServletIOTest.testStolenAsyncRead + _inputState = InputState.UNREADY; + break; + + default: + throw new IllegalStateException(toStringLocked()); } } - return woken; } public boolean onWritePossible() diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState_input.puml b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState_input.puml new file mode 100644 index 00000000000..13eb5dc325b --- /dev/null +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState_input.puml @@ -0,0 +1,84 @@ +@startuml +title HttpChannelState + +note top of onReadReady_inputState: onReadReady + +state "input state" as onReadReady_inputState { + state "IDLE" as onReadReady_IDLE + state "UNREADY" as onReadReady_UNREADY + state "READY" as onReadReady_READY + + state "channel state" as onReadReady_channelState { + state "WAITING" as onReadReady_WAITING + state "WOKEN" as onReadReady_WOKEN + onReadReady_WAITING --> onReadReady_WOKEN + } + + onReadReady_IDLE --> onReadReady_channelState + onReadReady_UNREADY --> onReadReady_channelState + + onReadReady_channelState --> onReadReady_READY + onReadReady_READY --> onReadReady_READY +} + + +note top of onReadEof_inputState: onReadEof + +state "input state" as onReadEof_inputState { + state "IDLE" as onReadEof_IDLE + state "UNREADY" as onReadEof_UNREADY + state "READY" as onReadEof_READY + + state "channel state" as onReadEof_channelState { + state "WAITING" as onReadEof_WAITING + state "WOKEN" as onReadEof_WOKEN + onReadEof_WAITING --> onReadEof_WOKEN + } + + onReadEof_IDLE --> onReadEof_channelState + onReadEof_UNREADY --> onReadEof_channelState + onReadEof_READY --> onReadEof_channelState + + onReadEof_channelState --> onReadEof_READY +} + + +note top of onReadIdle_inputState: onReadIdle + +state "input state" as onReadIdle_inputState { + state "IDLE" as onReadIdle_IDLE + state "UNREADY" as onReadIdle_UNREADY + state "READY" as onReadIdle_READY + + onReadIdle_IDLE --> onReadIdle_IDLE + onReadIdle_UNREADY --> onReadIdle_IDLE + onReadIdle_READY --> onReadIdle_IDLE +} + + +note top of onReadUnready_inputState: onReadUnready + +state "input state" as onReadUnready_inputState { + state "IDLE" as onReadUnready_IDLE + state "UNREADY" as onReadUnready_UNREADY + state "READY" as onReadUnready_READY + + onReadUnready_IDLE --> onReadUnready_UNREADY + onReadUnready_UNREADY --> onReadUnready_UNREADY + onReadUnready_READY --> onReadUnready_UNREADY +} + + +note top of onContentAdded_inputState: onContentAdded + +state "input state" as onContentAdded_inputState { + state "IDLE" as onContentAdded_IDLE + state "UNREADY" as onContentAdded_UNREADY + state "READY" as onContentAdded_READY + + onContentAdded_IDLE --> onContentAdded_READY + onContentAdded_UNREADY --> onContentAdded_READY + onContentAdded_READY --> onContentAdded_READY +} + +@enduml diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java index 9b8e1061607..add0c44b9d3 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java @@ -33,7 +33,6 @@ import org.eclipse.jetty.http.HttpHeader; import org.eclipse.jetty.http.HttpHeaderValue; import org.eclipse.jetty.http.HttpMethod; import org.eclipse.jetty.http.HttpParser; -import org.eclipse.jetty.http.HttpParser.RequestHandler; import org.eclipse.jetty.http.MetaData; import org.eclipse.jetty.http.PreEncodedHttpField; import org.eclipse.jetty.io.AbstractConnection; @@ -68,7 +67,6 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http private final HttpParser _parser; private final AtomicInteger _contentBufferReferences = new AtomicInteger(); private volatile ByteBuffer _requestBuffer = null; - private final BlockingReadCallback _blockingReadCallback = new BlockingReadCallback(); private final AsyncReadCallback _asyncReadCallback = new AsyncReadCallback(); private final SendCallback _sendCallback = new SendCallback(); private final boolean _recordHttpComplianceViolations; @@ -316,21 +314,20 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http } /** - * Fill and parse data looking for content - * - * @return true if an {@link RequestHandler} method was called and it returned true; + * Parse and fill data, looking for content */ - protected boolean fillAndParseForContent() + void parseAndFillForContent() { - boolean handled = false; + // When fillRequestBuffer() is called, it must always be followed by a parseRequestBuffer() call otherwise this method + // doesn't trigger EOF/earlyEOF which breaks AsyncRequestReadTest.testPartialReadThenShutdown() + int filled = Integer.MAX_VALUE; while (_parser.inContentState()) { - int filled = fillRequestBuffer(); - handled = parseRequestBuffer(); - if (handled || filled <= 0 || _input.hasContent()) + boolean handled = parseRequestBuffer(); + if (handled || filled <= 0) break; + filled = fillRequestBuffer(); } - return handled; } private int fillRequestBuffer() @@ -600,25 +597,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http public void asyncReadFillInterested() { - getEndPoint().fillInterested(_asyncReadCallback); - } - - public void blockingReadFillInterested() - { - // We try fillInterested here because of SSL and - // spurious wakeups. With blocking reads, we read in a loop - // that tries to read/parse content and blocks waiting if there is - // none available. The loop can be woken up by incoming encrypted - // bytes, which due to SSL might not produce any decrypted bytes. - // Thus the loop needs to register fill interest again. However if - // the loop is woken up spuriously, then the register interest again - // can result in a pending read exception, unless we use tryFillInterested. - getEndPoint().tryFillInterested(_blockingReadCallback); - } - - public void blockingReadFailure(Throwable e) - { - _blockingReadCallback.failed(e); + getEndPoint().tryFillInterested(_asyncReadCallback); } @Override @@ -655,8 +634,15 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http @Override public void succeeded() { - if (_contentBufferReferences.decrementAndGet() == 0) + int counter = _contentBufferReferences.decrementAndGet(); + if (counter == 0) releaseRequestBuffer(); + // TODO: this should do something (warn? fail?) if _contentBufferReferences goes below 0 + if (counter < 0) + { + LOG.warn("Content reference counting went below zero: {}", counter); + _contentBufferReferences.incrementAndGet(); + } } @Override @@ -666,44 +652,30 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http } } - private class BlockingReadCallback implements Callback - { - @Override - public void succeeded() - { - _input.unblock(); - } - - @Override - public void failed(Throwable x) - { - _input.failed(x); - } - - @Override - public InvocationType getInvocationType() - { - // This callback does not block, rather it wakes up the - // thread that is blocked waiting on the read. - return InvocationType.NON_BLOCKING; - } - } - private class AsyncReadCallback implements Callback { @Override public void succeeded() { - if (_channel.getState().onReadPossible()) + if (_channel.getRequest().getHttpInput().onContentProducible()) _channel.handle(); } @Override public void failed(Throwable x) { - if (_input.failed(x)) + if (_channel.failed(x)) _channel.handle(); } + + @Override + public InvocationType getInvocationType() + { + // This callback does not block when the HttpInput is in blocking mode, + // rather it wakes up the thread that is blocked waiting on the read; + // but it can if it is in async mode, hence the varying InvocationType. + return _channel.getRequest().getHttpInput().isAsync() ? InvocationType.BLOCKING : InvocationType.NON_BLOCKING; + } } private class SendCallback extends IteratingCallback diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java index 02c0be87ba4..d1aaa1d5c8f 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java @@ -18,60 +18,349 @@ package org.eclipse.jetty.server; -import java.io.EOFException; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.ArrayDeque; -import java.util.Deque; import java.util.Objects; -import java.util.concurrent.Executor; -import java.util.concurrent.TimeUnit; import javax.servlet.ReadListener; import javax.servlet.ServletInputStream; -import org.eclipse.jetty.http.BadMessageException; -import org.eclipse.jetty.http.HttpStatus; -import org.eclipse.jetty.io.EofException; -import org.eclipse.jetty.io.RuntimeIOException; +import org.eclipse.jetty.server.handler.ContextHandler; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.component.Destroyable; -import org.eclipse.jetty.util.thread.AutoLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * {@link HttpInput} provides an implementation of {@link ServletInputStream} for {@link HttpChannel}. - *
- * Content may arrive in patterns such as [content(), content(), messageComplete()] - * so that this class maintains two states: the content state that tells - * whether there is content to consume and the EOF state that tells whether an EOF has arrived. - * Only once the content has been consumed the content state is moved to the EOF state. - *
+ *While this class is-a Runnable, it should never be dispatched in it's own thread. It is a runnable only so that the calling thread can use {@link + * ContextHandler#handle(Runnable)} to setup classloaders etc.
*/ public class HttpInput extends ServletInputStream implements Runnable { + private static final Logger LOG = LoggerFactory.getLogger(HttpInput.class); + + private final byte[] _oneByteBuffer = new byte[1]; + private final BlockingContentProducer _blockingContentProducer; + private final AsyncContentProducer _asyncContentProducer; + + private final HttpChannelState _channelState; + private ContentProducer _contentProducer; + private boolean _consumedEof; + private ReadListener _readListener; + + public HttpInput(HttpChannelState state) + { + _channelState = state; + _asyncContentProducer = new AsyncContentProducer(state.getHttpChannel()); + _blockingContentProducer = new BlockingContentProducer(_asyncContentProducer); + _contentProducer = _blockingContentProducer; + } + + /* HttpInput */ + + public void recycle() + { + if (LOG.isDebugEnabled()) + LOG.debug("recycle {}", this); + _blockingContentProducer.recycle(); + _contentProducer = _blockingContentProducer; + _consumedEof = false; + _readListener = null; + } + /** - * An interceptor for HTTP Request input. - *- * Unlike InputStream wrappers that can be applied by filters, an interceptor - * is entirely transparent and works with async IO APIs. - *
- *- * An Interceptor may consume data from the passed content and the interceptor - * will continue to be called for the same content until the interceptor returns - * null or an empty content. Thus even if the passed content is completely consumed - * the interceptor will be called with the same content until it can no longer - * produce more content. - *
- * - * @see HttpInput#setInterceptor(Interceptor) - * @see HttpInput#addInterceptor(Interceptor) + * @return The current Interceptor, or null if none set */ + public Interceptor getInterceptor() + { + return _contentProducer.getInterceptor(); + } + + /** + * Set the interceptor. + * + * @param interceptor The interceptor to use. + */ + public void setInterceptor(Interceptor interceptor) + { + if (LOG.isDebugEnabled()) + LOG.debug("setting interceptor to {}", interceptor); + _contentProducer.setInterceptor(interceptor); + } + + /** + * Set the {@link Interceptor}, chaining it to the existing one if + * an {@link Interceptor} is already set. + * + * @param interceptor the next {@link Interceptor} in a chain + */ + public void addInterceptor(Interceptor interceptor) + { + Interceptor currentInterceptor = _contentProducer.getInterceptor(); + if (currentInterceptor == null) + { + if (LOG.isDebugEnabled()) + LOG.debug("adding single interceptor: {}", interceptor); + _contentProducer.setInterceptor(interceptor); + } + else + { + ChainedInterceptor chainedInterceptor = new ChainedInterceptor(currentInterceptor, interceptor); + if (LOG.isDebugEnabled()) + LOG.debug("adding chained interceptor: {}", chainedInterceptor); + _contentProducer.setInterceptor(chainedInterceptor); + } + } + + public long getContentReceived() + { + return _contentProducer.getRawContentArrived(); + } + + public boolean consumeAll() + { + if (LOG.isDebugEnabled()) + LOG.debug("consume all"); + boolean atEof = _contentProducer.consumeAll(new IOException("Unconsumed content")); + if (atEof) + _consumedEof = true; + + if (isFinished()) + return !isError(); + + return false; + } + + public boolean isError() + { + boolean error = _contentProducer.isError(); + if (LOG.isDebugEnabled()) + LOG.debug("isError = {}", error); + return error; + } + + public boolean isAsync() + { + if (LOG.isDebugEnabled()) + LOG.debug("isAsync read listener = " + _readListener); + return _readListener != null; + } + + /* ServletInputStream */ + + @Override + public boolean isFinished() + { + boolean finished = _consumedEof; + if (LOG.isDebugEnabled()) + LOG.debug("isFinished? {}", finished); + return finished; + } + + @Override + public boolean isReady() + { + boolean ready = _contentProducer.isReady(); + if (!ready) + { + if (LOG.isDebugEnabled()) + LOG.debug("isReady? false"); + return false; + } + + if (LOG.isDebugEnabled()) + LOG.debug("isReady? true"); + return true; + } + + @Override + public void setReadListener(ReadListener readListener) + { + if (LOG.isDebugEnabled()) + LOG.debug("setting read listener to {}", readListener); + if (_readListener != null) + throw new IllegalStateException("ReadListener already set"); + _readListener = Objects.requireNonNull(readListener); + //illegal if async not started + if (!_channelState.isAsyncStarted()) + throw new IllegalStateException("Async not started"); + + _contentProducer = _asyncContentProducer; + // trigger content production + if (isReady() && _channelState.onReadEof()) // onReadEof b/c we want to transition from WAITING to WOKEN + scheduleReadListenerNotification(); // this is needed by AsyncServletIOTest.testStolenAsyncRead + } + + public boolean onContentProducible() + { + return _contentProducer.onContentProducible(); + } + + @Override + public int read() throws IOException + { + int read = read(_oneByteBuffer, 0, 1); + if (read == 0) + throw new IOException("unready read=0"); + return read < 0 ? -1 : _oneByteBuffer[0] & 0xFF; + } + + @Override + public int read(byte[] b, int off, int len) throws IOException + { + // Calculate minimum request rate for DoS protection + _contentProducer.checkMinDataRate(); + + Content content = _contentProducer.nextContent(); + if (content == null) + throw new IllegalStateException("read on unready input"); + if (!content.isSpecial()) + { + int read = content.get(b, off, len); + if (LOG.isDebugEnabled()) + LOG.debug("read produced {} byte(s)", read); + if (content.isEmpty()) + _contentProducer.reclaim(content); + return read; + } + + Throwable error = content.getError(); + if (LOG.isDebugEnabled()) + LOG.debug("read error = " + error); + if (error != null) + { + if (error instanceof IOException) + throw (IOException)error; + throw new IOException(error); + } + + if (content.isEof()) + { + if (LOG.isDebugEnabled()) + LOG.debug("read at EOF, setting consumed EOF to true"); + _consumedEof = true; + // If EOF do we need to wake for allDataRead callback? + if (onContentProducible()) + scheduleReadListenerNotification(); + return -1; + } + + throw new AssertionError("no data, no error and not EOF"); + } + + private void scheduleReadListenerNotification() + { + HttpChannel channel = _channelState.getHttpChannel(); + channel.execute(channel); + } + + /** + * Check if this HttpInput instance has content stored internally, without fetching/parsing + * anything from the underlying channel. + * @return true if the input contains content, false otherwise. + */ + public boolean hasContent() + { + // Do not call _contentProducer.available() as it calls HttpChannel.produceContent() + // which is forbidden by this method's contract. + boolean hasContent = _contentProducer.hasContent(); + if (LOG.isDebugEnabled()) + LOG.debug("hasContent = {}", hasContent); + return hasContent; + } + + @Override + public int available() + { + int available = _contentProducer.available(); + if (LOG.isDebugEnabled()) + LOG.debug("available = {}", available); + return available; + } + + /* Runnable */ + + /* + *While this class is-a Runnable, it should never be dispatched in it's own thread. It is a runnable only so that the calling thread can use {@link + * ContextHandler#handle(Runnable)} to setup classloaders etc.
+ */ + @Override + public void run() + { + Content content = _contentProducer.nextContent(); + if (LOG.isDebugEnabled()) + LOG.debug("running on content {}", content); + // The nextContent() call could return null if the transformer ate all + // the raw bytes without producing any transformed content. + if (content == null) + return; + + // This check is needed when a request is started async but no read listener is registered. + if (_readListener == null) + { + if (LOG.isDebugEnabled()) + LOG.debug("running without a read listener"); + onContentProducible(); + return; + } + + if (content.isSpecial()) + { + Throwable error = content.getError(); + if (error != null) + { + if (LOG.isDebugEnabled()) + LOG.debug("running has error: {}", (Object)error); + // TODO is this necessary to add here? + _channelState.getHttpChannel().getResponse().getHttpFields().add(HttpConnection.CONNECTION_CLOSE); + _readListener.onError(error); + } + else if (content.isEof()) + { + try + { + if (LOG.isDebugEnabled()) + LOG.debug("running at EOF"); + _readListener.onAllDataRead(); + } + catch (Throwable x) + { + if (LOG.isDebugEnabled()) + LOG.debug("running failed onAllDataRead", x); + _readListener.onError(x); + } + } + } + else + { + if (LOG.isDebugEnabled()) + LOG.debug("running has content"); + try + { + _readListener.onDataAvailable(); + } + catch (Throwable x) + { + if (LOG.isDebugEnabled()) + LOG.debug("running failed onDataAvailable", x); + _readListener.onError(x); + } + } + } + + @Override + public String toString() + { + return getClass().getSimpleName() + "@" + hashCode() + + " cs=" + _channelState + + " cp=" + _contentProducer + + " eof=" + _consumedEof; + } + public interface Interceptor { /** - * @param content The content to be intercepted (may be empty or a {@link SentinelContent}. + * @param content The content to be intercepted. * The content will be modified with any data the interceptor consumes, but there is no requirement * that all the data is consumed by the interceptor. * @return The intercepted content or null if interception is completed for that content. @@ -85,23 +374,23 @@ public class HttpInput extends ServletInputStream implements Runnable * {@link Interceptor#readFrom(Content)} and then passes any {@link Content} returned * to the next {@link Interceptor}. */ - public static class ChainedInterceptor implements Interceptor, Destroyable + private static class ChainedInterceptor implements Interceptor, Destroyable { private final Interceptor _prev; private final Interceptor _next; - public ChainedInterceptor(Interceptor prev, Interceptor next) + ChainedInterceptor(Interceptor prev, Interceptor next) { _prev = prev; _next = next; } - public Interceptor getPrev() + Interceptor getPrev() { return _prev; } - public Interceptor getNext() + Interceptor getNext() { return _next; } @@ -109,7 +398,10 @@ public class HttpInput extends ServletInputStream implements Runnable @Override public Content readFrom(Content content) { - return getNext().readFrom(getPrev().readFrom(content)); + Content c = getPrev().readFrom(content); + if (c == null) + return null; + return getNext().readFrom(c); } @Override @@ -120,866 +412,22 @@ public class HttpInput extends ServletInputStream implements Runnable if (_next instanceof Destroyable) ((Destroyable)_next).destroy(); } - } - - private static final Logger LOG = LoggerFactory.getLogger(HttpInput.class); - static final Content EOF_CONTENT = new EofContent("EOF"); - static final Content EARLY_EOF_CONTENT = new EofContent("EARLY_EOF"); - - private final AutoLock.WithCondition _lock = new AutoLock.WithCondition(); - private final byte[] _oneByteBuffer = new byte[1]; - private Content _content; - private Content _intercepted; - private final Deque- * Typically this will result in an EOFException being thrown from a subsequent read rather than a -1 return. - * - * @return true if content channel woken for read - */ - public boolean earlyEOF() - { - return addContent(EARLY_EOF_CONTENT); - } - - /** - * This method should be called to signal that all the expected content arrived. - * - * @return true if content channel woken for read - */ - public boolean eof() - { - return addContent(EOF_CONTENT); - } - - public boolean consumeAll() - { - try (AutoLock l = _lock.lock()) - { - try - { - while (true) - { - Content item = nextContent(); - if (item == null) - break; // Let's not bother blocking - - skip(item, item.remaining()); - } - if (isFinished()) - return !isError(); - - _state = EARLY_EOF; - return false; - } - catch (Throwable e) - { - if (LOG.isDebugEnabled()) - LOG.debug("Unable to consume all input", e); - _state = new ErrorState(e); - return false; - } - } - } - - public boolean isError() - { - try (AutoLock l = _lock.lock()) - { - return _state instanceof ErrorState; - } - } - - public boolean isAsync() - { - try (AutoLock l = _lock.lock()) - { - return _state == ASYNC; - } - } - - @Override - public boolean isFinished() - { - try (AutoLock l = _lock.lock()) - { - return _state instanceof EOFState; - } - } - - @Override - public boolean isReady() - { - try - { - try (AutoLock l = _lock.lock()) - { - if (_listener == null) - return true; - if (_state instanceof EOFState) - return true; - if (_waitingForContent) - return false; - if (produceNextContext() != null) - return true; - _channelState.onReadUnready(); - _waitingForContent = true; - } - return false; - } - catch (IOException e) - { - LOG.trace("IGNORED", e); - return true; - } - } - - @Override - public void setReadListener(ReadListener readListener) - { - boolean woken = false; - try - { - try (AutoLock l = _lock.lock()) - { - if (_listener != null) - throw new IllegalStateException("ReadListener already set"); - - _listener = Objects.requireNonNull(readListener); - - //illegal if async not started - if (!_channelState.isAsyncStarted()) - throw new IllegalStateException("Async not started"); - - if (isError()) - { - woken = _channelState.onReadReady(); - } - else - { - Content content = produceNextContext(); - if (content != null) - { - _state = ASYNC; - woken = _channelState.onReadReady(); - } - else if (_state == EOF) - { - _state = AEOF; - woken = _channelState.onReadEof(); - } - else - { - _state = ASYNC; - _channelState.onReadUnready(); - _waitingForContent = true; - } - } - } - } - catch (IOException e) - { - throw new RuntimeIOException(e); - } - - if (woken) - wake(); - } - - public boolean onIdleTimeout(Throwable x) - { - try (AutoLock l = _lock.lock()) - { - boolean neverDispatched = getHttpChannelState().isIdle(); - if ((_waitingForContent || neverDispatched) && !isError()) - { - x.addSuppressed(new Throwable("HttpInput idle timeout")); - _state = new ErrorState(x); - return wakeup(); - } - return false; - } - } - - public boolean failed(Throwable x) - { - try (AutoLock l = _lock.lock()) - { - // Errors may be reported multiple times, for example - // a local idle timeout and a remote I/O failure. - if (isError()) - { - if (LOG.isDebugEnabled()) - { - // Log both the original and current failure - // without modifying the original failure. - Throwable failure = new Throwable(_state.getError()); - failure.addSuppressed(x); - LOG.debug("HttpInput failure", failure); - } - } - else - { - // Add a suppressed throwable to capture this stack - // trace without wrapping/hiding the original failure. - x.addSuppressed(new Throwable("HttpInput failure")); - _state = new ErrorState(x); - } - return wakeup(); - } - } - - private boolean wakeup() - { - assert _lock.isHeldByCurrentThread(); - if (_listener != null) - return _channelState.onContentAdded(); - _lock.signal(); - return false; - } - - /* - *
While this class is-a Runnable, it should never be dispatched in it's own thread. It is a runnable only so that the calling thread can use {@link - * ContextHandler#handle(Runnable)} to setup classloaders etc.
- */ - @Override - public void run() - { - final ReadListener listener; - Throwable error; - boolean aeof = false; - - try (AutoLock l = _lock.lock()) - { - listener = _listener; - - if (_state == EOF) - return; - - if (_state == AEOF) - { - _state = EOF; - aeof = true; - } - - error = _state.getError(); - - if (!aeof && error == null) - { - Content content = nextInterceptedContent(); - if (content == null) - return; - - // Consume a directly received EOF without first calling onDataAvailable - // So -1 will never be read and only onAddDataRread or onError will be called - if (content instanceof EofContent) - { - consume(content); - if (_state == EARLY_EOF) - error = _state.getError(); - else if (_state == AEOF) - { - aeof = true; - _state = EOF; - } - } - } - } - - try - { - if (error != null) - { - // TODO is this necessary to add here? - _channelState.getHttpChannel().getResponse().getHttpFields().add(HttpConnection.CONNECTION_CLOSE); - listener.onError(error); - } - else if (aeof) - { - listener.onAllDataRead(); - } - else - { - listener.onDataAvailable(); - // If -1 was read, then HttpChannelState#onEOF will have been called and a subsequent - // unhandle will call run again so onAllDataRead() can be called. - } - } - catch (Throwable e) - { - if (LOG.isDebugEnabled()) - LOG.warn("Unable to notify listener", e); - else - LOG.warn("Unable to notify listener: {}", e.toString()); - try - { - if (aeof || error == null) - { - _channelState.getHttpChannel().getResponse().getHttpFields().add(HttpConnection.CONNECTION_CLOSE); - listener.onError(e); - } - } - catch (Throwable e2) - { - String msg = "Unable to notify error to listener"; - if (LOG.isDebugEnabled()) - LOG.warn(msg, e2); - else - LOG.warn("{}: {}", msg, e2.toString()); - throw new RuntimeIOException(msg, e2); - } - } - } - - @Override - public String toString() - { - State state; - long consumed; - int q; - Content content; - try (AutoLock l = _lock.lock()) - { - state = _state; - consumed = _contentConsumed; - q = _inputQ.size(); - content = _inputQ.peekFirst(); - } - return String.format("%s@%x[c=%d,q=%d,[0]=%s,s=%s]", - getClass().getSimpleName(), - hashCode(), - consumed, - q, - content, - state); - } - - /** - * A Sentinel Content, which has zero length content but - * indicates some other event in the input stream (eg EOF) - */ - public static class SentinelContent extends Content - { - private final String _name; - - public SentinelContent(String name) - { - super(BufferUtil.EMPTY_BUFFER); - _name = name; - } @Override public String toString() { - return _name; - } - } - - public static class EofContent extends SentinelContent - { - EofContent(String name) - { - super(name); + return getClass().getSimpleName() + "@" + hashCode() + " [p=" + _prev + ",n=" + _next + "]"; } } + /** + * A content represents the production of a {@link HttpChannel} returned by {@link HttpChannel#produceContent()}. + * There are two fundamental types of content: special and non-special. + * Non-special content always wraps a byte buffer that can be consumed and must be recycled once it is empty, either + * via {@link #succeeded()} or {@link #failed(Throwable)}. + * Special content indicates a special event, like EOF or an error and never wraps a byte buffer. Calling + * {@link #succeeded()} or {@link #failed(Throwable)} on those have no effect. + */ public static class Content implements Callback { protected final ByteBuffer _content; @@ -989,6 +437,10 @@ public class HttpInput extends ServletInputStream implements Runnable _content = content; } + /** + * Get the wrapped byte buffer. Throws {@link IllegalStateException} if the content is special. + * @return the wrapped byte buffer. + */ public ByteBuffer getByteBuffer() { return _content; @@ -1000,6 +452,13 @@ public class HttpInput extends ServletInputStream implements Runnable return InvocationType.NON_BLOCKING; } + /** + * Read the wrapped byte buffer. Throws {@link IllegalStateException} if the content is special. + * @param buffer The array into which bytes are to be written. + * @param offset The offset within the array of the first byte to be written. + * @param length The maximum number of bytes to be written to the given array. + * @return The amount of bytes read from the buffer. + */ public int get(byte[] buffer, int offset, int length) { length = Math.min(_content.remaining(), length); @@ -1007,6 +466,11 @@ public class HttpInput extends ServletInputStream implements Runnable return length; } + /** + * Skip some bytes from the buffer. Has no effect on a special content. + * @param length How many bytes to skip. + * @return How many bytes were skipped. + */ public int skip(int length) { length = Math.min(_content.remaining(), length); @@ -1014,55 +478,193 @@ public class HttpInput extends ServletInputStream implements Runnable return length; } + /** + * Check if there is at least one byte left in the buffer. + * Always false on a special content. + * @return true if there is at least one byte left in the buffer. + */ public boolean hasContent() { return _content.hasRemaining(); } + /** + * Get the number of bytes remaining in the buffer. + * Always 0 on a special content. + * @return the number of bytes remaining in the buffer. + */ public int remaining() { return _content.remaining(); } + /** + * Check if the buffer is empty. + * Always true on a special content. + * @return true if there is 0 byte left in the buffer. + */ public boolean isEmpty() { return !_content.hasRemaining(); } - @Override - public String toString() - { - return String.format("Content@%x{%s}", hashCode(), BufferUtil.toDetailString(_content)); - } - } - - protected abstract static class State - { - public boolean blockForContent(HttpInput in) throws IOException + /** + * Check if the content is special. + * @return true if the content is special, false otherwise. + */ + public boolean isSpecial() { return false; } - public int noContent() throws IOException + /** + * Check if EOF was reached. Both special and non-special content + * can have this flag set to true but in the case of non-special content, + * this can be interpreted as a hint as it is always going to be followed + * by another content that is both special and EOF. + * @return true if EOF was reached, false otherwise. + */ + public boolean isEof() { - return -1; + return false; } + /** + * Get the reported error. Only special contents can have an error. + * @return the error or null if there is none. + */ public Throwable getError() { return null; } + + @Override + public String toString() + { + return String.format("%s@%x{%s,spc=%s,eof=%s,err=%s}", getClass().getSimpleName(), hashCode(), + BufferUtil.toDetailString(_content), isSpecial(), isEof(), getError()); + } } - protected static class EOFState extends State + /** + * Simple non-special content wrapper allow overriding the EOF flag. + */ + public static class WrappingContent extends Content { + private final Content _delegate; + private final boolean _eof; + + public WrappingContent(Content delegate, boolean eof) + { + super(delegate.getByteBuffer()); + _delegate = delegate; + _eof = eof; + } + + @Override + public boolean isEof() + { + return _eof; + } + + @Override + public void succeeded() + { + _delegate.succeeded(); + } + + @Override + public void failed(Throwable x) + { + _delegate.failed(x); + } + + @Override + public InvocationType getInvocationType() + { + return _delegate.getInvocationType(); + } } - protected class ErrorState extends EOFState + /** + * Abstract class that implements the standard special content behavior. + */ + public abstract static class SpecialContent extends Content { - final Throwable _error; + public SpecialContent() + { + super(null); + } - ErrorState(Throwable error) + @Override + public final ByteBuffer getByteBuffer() + { + throw new IllegalStateException(this + " has no buffer"); + } + + @Override + public final int get(byte[] buffer, int offset, int length) + { + throw new IllegalStateException(this + " has no buffer"); + } + + @Override + public final int skip(int length) + { + return 0; + } + + @Override + public final boolean hasContent() + { + return false; + } + + @Override + public final int remaining() + { + return 0; + } + + @Override + public final boolean isEmpty() + { + return true; + } + + @Override + public final boolean isSpecial() + { + return true; + } + } + + /** + * EOF special content. + */ + public static final class EofContent extends SpecialContent + { + @Override + public boolean isEof() + { + return true; + } + + @Override + public String toString() + { + return getClass().getSimpleName(); + } + } + + /** + * Error special content. + */ + public static final class ErrorContent extends SpecialContent + { + private final Throwable _error; + + public ErrorContent(Throwable error) { _error = error; } @@ -1073,88 +675,10 @@ public class HttpInput extends ServletInputStream implements Runnable return _error; } - @Override - public int noContent() throws IOException - { - if (_error instanceof IOException) - throw (IOException)_error; - throw new IOException(_error); - } - @Override public String toString() { - return "ERROR:" + _error; + return getClass().getSimpleName() + " [" + _error + "]"; } } - - protected static final State STREAM = new State() - { - @Override - public boolean blockForContent(HttpInput input) throws IOException - { - input.blockForContent(); - return true; - } - - @Override - public String toString() - { - return "STREAM"; - } - }; - - protected static final State ASYNC = new State() - { - @Override - public int noContent() throws IOException - { - return 0; - } - - @Override - public String toString() - { - return "ASYNC"; - } - }; - - protected static final State EARLY_EOF = new EOFState() - { - @Override - public int noContent() throws IOException - { - throw getError(); - } - - @Override - public String toString() - { - return "EARLY_EOF"; - } - - @Override - public IOException getError() - { - return new EofException("Early EOF"); - } - }; - - protected static final State EOF = new EOFState() - { - @Override - public String toString() - { - return "EOF"; - } - }; - - protected static final State AEOF = new EOFState() - { - @Override - public String toString() - { - return "AEOF"; - } - }; } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInputOverHTTP.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInputOverHTTP.java deleted file mode 100644 index 6f646c4ce61..00000000000 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInputOverHTTP.java +++ /dev/null @@ -1,35 +0,0 @@ -// -// ======================================================================== -// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others. -// -// This program and the accompanying materials are made available under -// the terms of the Eclipse Public License 2.0 which is available at -// https://www.eclipse.org/legal/epl-2.0 -// -// This Source Code may also be made available under the following -// Secondary Licenses when the conditions for such availability set -// forth in the Eclipse Public License, v. 2.0 are satisfied: -// the Apache License v2.0 which is available at -// https://www.apache.org/licenses/LICENSE-2.0 -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// ======================================================================== -// - -package org.eclipse.jetty.server; - -import java.io.IOException; - -public class HttpInputOverHTTP extends HttpInput -{ - public HttpInputOverHTTP(HttpChannelState state) - { - super(state); - } - - @Override - protected void produceContent() throws IOException - { - ((HttpConnection)getHttpChannelState().getHttpChannel().getEndPoint().getConnection()).fillAndParseForContent(); - } -} diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInputState.puml b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInputState.puml new file mode 100644 index 00000000000..0ef1896b5fe --- /dev/null +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInputState.puml @@ -0,0 +1,16 @@ +@startuml + +IDLE: +READY: +UNREADY: + +[*] --> IDLE + +IDLE --> UNREADY : isReady +IDLE -right->READY : isReady + +UNREADY -up-> READY : ASYNC onContentProducible + +READY -left->IDLE : nextContent + +@enduml diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput_async.puml b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput_async.puml new file mode 100644 index 00000000000..c520361faef --- /dev/null +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput_async.puml @@ -0,0 +1,114 @@ +@startuml +title "HttpInput" + +participant AsyncContentDelivery as "[async\ncontent\ndelivery]" +participant HttpChannel as "Http\nChannel\n" +participant HttpChannelState as "Http\nChannel\nState" +participant HttpInputInterceptor as "Http\nInput.\nInterceptor" +participant AsyncContentProducer as "Async\nContent\nProducer" +participant HttpInput as "Http\nInput\n" +participant Application as "\nApplication\n" + +autoactivate on + +== Async Read == + +Application->HttpInput: read +activate Application + HttpInput->AsyncContentProducer: nextContent + AsyncContentProducer->AsyncContentProducer: next\nTransformed\nContent + AsyncContentProducer->HttpChannel: produceContent + return raw content or null + alt if raw content is not null + AsyncContentProducer->HttpInputInterceptor: readFrom + return transformed content + end + return + alt if transformed content is not null + AsyncContentProducer->HttpChannelState: onReadIdle + return + end + return content or null + note over HttpInput + throw ISE + if content + is null + end note + HttpInput->AsyncContentProducer: reclaim + return +return +deactivate Application + +== isReady == + +Application->HttpInput: isReady +activate Application + HttpInput->AsyncContentProducer: isReady + AsyncContentProducer->AsyncContentProducer: next\nTransformed\nContent + AsyncContentProducer->HttpChannel: produceContent + return raw content or null + alt if raw content is not null + AsyncContentProducer->HttpInputInterceptor: readFrom + return transformed content + end + return + alt if transformed content is not null + AsyncContentProducer->HttpChannelState: onContentAdded + return + else transformed content is null + AsyncContentProducer->HttpChannelState: onReadUnready + return + AsyncContentProducer->HttpChannel: needContent + return + alt if needContent returns true + AsyncContentProducer->AsyncContentProducer: next\nTransformed\nContent + return + alt if transformed content is not null + AsyncContentProducer->HttpChannelState: onContentAdded + return + end + end + end + return boolean\n[transformed\ncontent is not null] +return +deactivate Application + +alt if content arrives + AsyncContentDelivery->HttpInput: onContentProducible + HttpInput->AsyncContentProducer: onContentProducible + alt if not at EOF + AsyncContentProducer->HttpChannelState: onReadReady + return true if woken + else if at EOF + AsyncContentProducer->HttpChannelState: onReadEof + return true if woken + end + return true if woken + return true if woken + alt onContentProducible returns true + AsyncContentDelivery->HttpChannel: execute(HttpChannel) + return + end +end + +||| + +== available == + +Application->HttpInput: available +activate Application + HttpInput->AsyncContentProducer: available + AsyncContentProducer->AsyncContentProducer: next\nTransformed\nContent + AsyncContentProducer->HttpChannel: produceContent + return raw content or null + alt if raw content is not null + AsyncContentProducer->HttpInputInterceptor: readFrom + return transformed content + end + return + return content size or\n0 if content is null +return +deactivate Application + +||| +@enduml diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput_blocking.puml b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput_blocking.puml new file mode 100644 index 00000000000..06cb82c4cfd --- /dev/null +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput_blocking.puml @@ -0,0 +1,64 @@ +@startuml +title "HttpInput" + +participant AsyncContentDelivery as "[async\ncontent\ndelivery]" +participant HttpChannel as "Http\nChannel\n" +participant HttpChannelState as "Http\nChannel\nState" +participant AsyncContentProducer as "Async\nContent\nProducer" +participant Semaphore as "\nSemaphore\n" +participant BlockingContentProducer as "Blocking\nContent\nProducer" +participant HttpInput as "Http\nInput\n" +participant Application as "\nApplication\n" + +autoactivate on + +== Blocking Read == + +Application->HttpInput: read +activate Application + HttpInput->BlockingContentProducer: nextContent + loop + BlockingContentProducer->AsyncContentProducer: nextContent + AsyncContentProducer->AsyncContentProducer: nextTransformedContent + AsyncContentProducer->HttpChannel: produceContent + return + return + alt content is not null + AsyncContentProducer->HttpChannelState: onReadIdle + return + end + return content or null + alt content is null + BlockingContentProducer->HttpChannelState: onReadUnready + return + BlockingContentProducer->HttpChannel: needContent + return + alt needContent returns false + BlockingContentProducer->Semaphore: acquire + return + else needContent returns true + note over BlockingContentProducer + continue loop + end note + end + else content is not null + return non-null content + end + end + ' return from BlockingContentProducer: nextContent + HttpInput->BlockingContentProducer: reclaim + BlockingContentProducer->AsyncContentProducer: reclaim + return + return +return +deactivate Application + +alt if content arrives + AsyncContentDelivery->HttpInput: wakeup + HttpInput->BlockingContentProducer: wakeup + BlockingContentProducer->Semaphore: release + return + return false + return false +end +@enduml diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/Request.java b/jetty-server/src/main/java/org/eclipse/jetty/server/Request.java index a431184250a..934cd2e6a37 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/Request.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/Request.java @@ -749,7 +749,7 @@ public class Request implements HttpServletRequest public long getContentRead() { - return _input.getContentConsumed(); + return _input.getContentReceived(); } @Override diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/session/SessionHandler.java b/jetty-server/src/main/java/org/eclipse/jetty/server/session/SessionHandler.java index 5c0efbf6695..d67c26027d7 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/session/SessionHandler.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/session/SessionHandler.java @@ -48,6 +48,7 @@ import javax.servlet.http.HttpSessionListener; import org.eclipse.jetty.http.BadMessageException; import org.eclipse.jetty.http.HttpCookie; +import org.eclipse.jetty.http.Syntax; import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.SessionIdManager; @@ -645,7 +646,7 @@ public class SessionHandler extends ScopedHandler HttpCookie cookie = null; cookie = new HttpCookie( - _cookieConfig.getName(), + getSessionCookieName(_cookieConfig), id, _cookieConfig.getDomain(), sessionPath, @@ -1334,6 +1335,13 @@ public class SessionHandler extends ScopedHandler public Session getSession(); } + public static String getSessionCookieName(SessionCookieConfig config) + { + if (config == null || config.getName() == null) + return __DefaultSessionCookie; + return config.getName(); + } + /** * CookieConfig * @@ -1423,6 +1431,10 @@ public class SessionHandler extends ScopedHandler { if (_context != null && _context.getContextHandler().isAvailable()) throw new IllegalStateException("CookieConfig cannot be set after ServletContext is started"); + if ("".equals(name)) + throw new IllegalArgumentException("Blank cookie name"); + if (name != null) + Syntax.requireValidRFC2616Token(name, "Bad Session cookie name"); _sessionCookie = name; } @@ -1596,12 +1608,12 @@ public class SessionHandler extends ScopedHandler Cookie[] cookies = request.getCookies(); if (cookies != null && cookies.length > 0) { - final String sessionCookie = getSessionCookieConfig().getName(); - for (int i = 0; i < cookies.length; i++) + final String sessionCookie = getSessionCookieName(getSessionCookieConfig()); + for (Cookie cookie : cookies) { - if (sessionCookie.equalsIgnoreCase(cookies[i].getName())) + if (sessionCookie.equalsIgnoreCase(cookie.getName())) { - String id = cookies[i].getValue(); + String id = cookie.getValue(); requestedSessionIdFromCookie = true; if (LOG.isDebugEnabled()) LOG.debug("Got Session ID {} from cookie {}", id, sessionCookie); diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/AsyncContentProducerTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/AsyncContentProducerTest.java new file mode 100644 index 00000000000..379a9386431 --- /dev/null +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/AsyncContentProducerTest.java @@ -0,0 +1,340 @@ +// +// ======================================================================== +// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under +// the terms of the Eclipse Public License 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0 +// +// This Source Code may also be made available under the following +// Secondary Licenses when the conditions for such availability set +// forth in the Eclipse Public License, v. 2.0 are satisfied: +// the Apache License v2.0 which is available at +// https://www.apache.org/licenses/LICENSE-2.0 +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.server; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.zip.GZIPOutputStream; + +import org.eclipse.jetty.io.ArrayByteBufferPool; +import org.eclipse.jetty.io.EofException; +import org.eclipse.jetty.server.handler.gzip.GzipHttpInputInterceptor; +import org.eclipse.jetty.util.compression.CompressionPool; +import org.eclipse.jetty.util.compression.InflaterPool; +import org.hamcrest.core.Is; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; + +public class AsyncContentProducerTest +{ + private ScheduledExecutorService scheduledExecutorService; + private InflaterPool inflaterPool; + + @BeforeEach + public void setUp() + { + scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); + inflaterPool = new InflaterPool(-1, true); + } + + @AfterEach + public void tearDown() + { + scheduledExecutorService.shutdownNow(); + } + + @Test + public void testAsyncContentProducerNoInterceptor() throws Exception + { + ByteBuffer[] buffers = new ByteBuffer[3]; + buffers[0] = ByteBuffer.wrap("1 hello 1".getBytes(StandardCharsets.ISO_8859_1)); + buffers[1] = ByteBuffer.wrap("2 howdy 2".getBytes(StandardCharsets.ISO_8859_1)); + buffers[2] = ByteBuffer.wrap("3 hey ya 3".getBytes(StandardCharsets.ISO_8859_1)); + final int totalContentBytesCount = countRemaining(buffers); + final String originalContentString = asString(buffers); + + CyclicBarrier barrier = new CyclicBarrier(2); + + ContentProducer contentProducer = new AsyncContentProducer(new ArrayDelayedHttpChannel(buffers, new HttpInput.EofContent(), scheduledExecutorService, barrier)); + + Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, contentProducer, (buffers.length + 1) * 2, 0, 4, barrier); + assertThat(error, nullValue()); + } + + @Test + public void testAsyncContentProducerNoInterceptorWithError() throws Exception + { + ByteBuffer[] buffers = new ByteBuffer[3]; + buffers[0] = ByteBuffer.wrap("1 hello 1".getBytes(StandardCharsets.ISO_8859_1)); + buffers[1] = ByteBuffer.wrap("2 howdy 2".getBytes(StandardCharsets.ISO_8859_1)); + buffers[2] = ByteBuffer.wrap("3 hey ya 3".getBytes(StandardCharsets.ISO_8859_1)); + final int totalContentBytesCount = countRemaining(buffers); + final String originalContentString = asString(buffers); + final Throwable expectedError = new EofException("Early EOF"); + + CyclicBarrier barrier = new CyclicBarrier(2); + + ContentProducer contentProducer = new AsyncContentProducer(new ArrayDelayedHttpChannel(buffers, new HttpInput.ErrorContent(expectedError), scheduledExecutorService, barrier)); + + Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, contentProducer, (buffers.length + 1) * 2, 0, 4, barrier); + assertThat(error, Is.is(expectedError)); + } + + @Test + public void testAsyncContentProducerGzipInterceptor() throws Exception + { + ByteBuffer[] uncompressedBuffers = new ByteBuffer[3]; + uncompressedBuffers[0] = ByteBuffer.wrap("1 hello 1".getBytes(StandardCharsets.ISO_8859_1)); + uncompressedBuffers[1] = ByteBuffer.wrap("2 howdy 2".getBytes(StandardCharsets.ISO_8859_1)); + uncompressedBuffers[2] = ByteBuffer.wrap("3 hey ya 3".getBytes(StandardCharsets.ISO_8859_1)); + final int totalContentBytesCount = countRemaining(uncompressedBuffers); + final String originalContentString = asString(uncompressedBuffers); + + ByteBuffer[] buffers = new ByteBuffer[3]; + buffers[0] = gzipByteBuffer(uncompressedBuffers[0]); + buffers[1] = gzipByteBuffer(uncompressedBuffers[1]); + buffers[2] = gzipByteBuffer(uncompressedBuffers[2]); + + CyclicBarrier barrier = new CyclicBarrier(2); + + ContentProducer contentProducer = new AsyncContentProducer(new ArrayDelayedHttpChannel(buffers, new HttpInput.EofContent(), scheduledExecutorService, barrier)); + contentProducer.setInterceptor(new GzipHttpInputInterceptor(inflaterPool, new ArrayByteBufferPool(1, 1, 2), 32)); + + Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, contentProducer, (buffers.length + 1) * 2, 0, 4, barrier); + assertThat(error, nullValue()); + } + + @Test + public void testAsyncContentProducerGzipInterceptorWithTinyBuffers() throws Exception + { + ByteBuffer[] uncompressedBuffers = new ByteBuffer[3]; + uncompressedBuffers[0] = ByteBuffer.wrap("1 hello 1".getBytes(StandardCharsets.ISO_8859_1)); + uncompressedBuffers[1] = ByteBuffer.wrap("2 howdy 2".getBytes(StandardCharsets.ISO_8859_1)); + uncompressedBuffers[2] = ByteBuffer.wrap("3 hey ya 3".getBytes(StandardCharsets.ISO_8859_1)); + final int totalContentBytesCount = countRemaining(uncompressedBuffers); + final String originalContentString = asString(uncompressedBuffers); + + ByteBuffer[] buffers = new ByteBuffer[3]; + buffers[0] = gzipByteBuffer(uncompressedBuffers[0]); + buffers[1] = gzipByteBuffer(uncompressedBuffers[1]); + buffers[2] = gzipByteBuffer(uncompressedBuffers[2]); + + CyclicBarrier barrier = new CyclicBarrier(2); + + ContentProducer contentProducer = new AsyncContentProducer(new ArrayDelayedHttpChannel(buffers, new HttpInput.EofContent(), scheduledExecutorService, barrier)); + contentProducer.setInterceptor(new GzipHttpInputInterceptor(inflaterPool, new ArrayByteBufferPool(1, 1, 2), 1)); + + Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, contentProducer, totalContentBytesCount + buffers.length + 2, 25, 4, barrier); + assertThat(error, nullValue()); + } + + @Test + public void testBlockingContentProducerGzipInterceptorWithError() throws Exception + { + ByteBuffer[] uncompressedBuffers = new ByteBuffer[3]; + uncompressedBuffers[0] = ByteBuffer.wrap("1 hello 1".getBytes(StandardCharsets.ISO_8859_1)); + uncompressedBuffers[1] = ByteBuffer.wrap("2 howdy 2".getBytes(StandardCharsets.ISO_8859_1)); + uncompressedBuffers[2] = ByteBuffer.wrap("3 hey ya 3".getBytes(StandardCharsets.ISO_8859_1)); + final int totalContentBytesCount = countRemaining(uncompressedBuffers); + final String originalContentString = asString(uncompressedBuffers); + final Throwable expectedError = new Throwable("HttpInput idle timeout"); + + ByteBuffer[] buffers = new ByteBuffer[3]; + buffers[0] = gzipByteBuffer(uncompressedBuffers[0]); + buffers[1] = gzipByteBuffer(uncompressedBuffers[1]); + buffers[2] = gzipByteBuffer(uncompressedBuffers[2]); + + CyclicBarrier barrier = new CyclicBarrier(2); + + ContentProducer contentProducer = new AsyncContentProducer(new ArrayDelayedHttpChannel(buffers, new HttpInput.ErrorContent(expectedError), scheduledExecutorService, barrier)); + contentProducer.setInterceptor(new GzipHttpInputInterceptor(inflaterPool, new ArrayByteBufferPool(1, 1, 2), 32)); + + Throwable error = readAndAssertContent(totalContentBytesCount, originalContentString, contentProducer, (buffers.length + 1) * 2, 0, 4, barrier); + assertThat(error, Is.is(expectedError)); + } + + private Throwable readAndAssertContent(int totalContentBytesCount, String originalContentString, ContentProducer contentProducer, int totalContentCount, int readyCount, int notReadyCount, CyclicBarrier barrier) throws InterruptedException, BrokenBarrierException, TimeoutException + { + int readBytes = 0; + String consumedString = ""; + int nextContentCount = 0; + int isReadyFalseCount = 0; + int isReadyTrueCount = 0; + Throwable error = null; + + while (true) + { + if (contentProducer.isReady()) + isReadyTrueCount++; + else + isReadyFalseCount++; + + HttpInput.Content content = contentProducer.nextContent(); + nextContentCount++; + if (content == null) + { + barrier.await(5, TimeUnit.SECONDS); + content = contentProducer.nextContent(); + nextContentCount++; + } + assertThat(content, notNullValue()); + + if (content.isSpecial()) + { + if (content.isEof()) + break; + error = content.getError(); + break; + } + + byte[] b = new byte[content.remaining()]; + readBytes += b.length; + content.getByteBuffer().get(b); + consumedString += new String(b, StandardCharsets.ISO_8859_1); + content.skip(content.remaining()); + } + + assertThat(nextContentCount, is(totalContentCount)); + assertThat(readBytes, is(totalContentBytesCount)); + assertThat(consumedString, is(originalContentString)); + assertThat(isReadyFalseCount, is(notReadyCount)); + assertThat(isReadyTrueCount, is(readyCount)); + return error; + } + + private static int countRemaining(ByteBuffer[] byteBuffers) + { + int total = 0; + for (ByteBuffer byteBuffer : byteBuffers) + { + total += byteBuffer.remaining(); + } + return total; + } + + private static String asString(ByteBuffer[] buffers) + { + StringBuilder sb = new StringBuilder(); + for (ByteBuffer buffer : buffers) + { + byte[] b = new byte[buffer.remaining()]; + buffer.duplicate().get(b); + sb.append(new String(b, StandardCharsets.ISO_8859_1)); + } + return sb.toString(); + } + + private static ByteBuffer gzipByteBuffer(ByteBuffer uncompressedBuffer) + { + try + { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + GZIPOutputStream output = new GZIPOutputStream(baos); + + byte[] b = new byte[uncompressedBuffer.remaining()]; + uncompressedBuffer.get(b); + output.write(b); + + output.close(); + return ByteBuffer.wrap(baos.toByteArray()); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } + + private static class ArrayDelayedHttpChannel extends HttpChannel + { + private final ByteBuffer[] byteBuffers; + private final HttpInput.Content finalContent; + private final ScheduledExecutorService scheduledExecutorService; + private final CyclicBarrier barrier; + private int counter; + private volatile HttpInput.Content nextContent; + + public ArrayDelayedHttpChannel(ByteBuffer[] byteBuffers, HttpInput.Content finalContent, ScheduledExecutorService scheduledExecutorService, CyclicBarrier barrier) + { + super(new MockConnector(), new HttpConfiguration(), null, null); + this.byteBuffers = new ByteBuffer[byteBuffers.length]; + this.finalContent = finalContent; + this.scheduledExecutorService = scheduledExecutorService; + this.barrier = barrier; + for (int i = 0; i < byteBuffers.length; i++) + { + this.byteBuffers[i] = byteBuffers[i].duplicate(); + } + } + + @Override + public boolean needContent() + { + if (nextContent != null) + return true; + scheduledExecutorService.schedule(() -> + { + if (byteBuffers.length > counter) + nextContent = new HttpInput.Content(byteBuffers[counter++]); + else + nextContent = finalContent; + try + { + barrier.await(5, TimeUnit.SECONDS); + } + catch (Exception e) + { + throw new AssertionError(e); + } + }, 50, TimeUnit.MILLISECONDS); + return false; + } + + @Override + public HttpInput.Content produceContent() + { + HttpInput.Content result = nextContent; + nextContent = null; + return result; + } + + @Override + public boolean failAllContent(Throwable failure) + { + nextContent = null; + counter = byteBuffers.length; + return false; + } + + @Override + public boolean failed(Throwable x) + { + return false; + } + + @Override + protected boolean eof() + { + return false; + } + } +} diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/BlockingContentProducerTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/BlockingContentProducerTest.java new file mode 100644 index 00000000000..a8f8fdb7dfc --- /dev/null +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/BlockingContentProducerTest.java @@ -0,0 +1,320 @@ +// +// ======================================================================== +// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under +// the terms of the Eclipse Public License 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0 +// +// This Source Code may also be made available under the following +// Secondary Licenses when the conditions for such availability set +// forth in the Eclipse Public License, v. 2.0 are satisfied: +// the Apache License v2.0 which is available at +// https://www.apache.org/licenses/LICENSE-2.0 +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.server; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.zip.GZIPOutputStream; + +import org.eclipse.jetty.io.ArrayByteBufferPool; +import org.eclipse.jetty.io.EofException; +import org.eclipse.jetty.server.handler.gzip.GzipHttpInputInterceptor; +import org.eclipse.jetty.util.compression.InflaterPool; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.core.Is.is; + +public class BlockingContentProducerTest +{ + private ScheduledExecutorService scheduledExecutorService; + private InflaterPool inflaterPool; + + @BeforeEach + public void setUp() + { + scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); + inflaterPool = new InflaterPool(-1, true); + } + + @AfterEach + public void tearDown() + { + scheduledExecutorService.shutdownNow(); + } + + @Test + public void testBlockingContentProducerNoInterceptor() + { + ByteBuffer[] buffers = new ByteBuffer[3]; + buffers[0] = ByteBuffer.wrap("1 hello 1".getBytes(StandardCharsets.ISO_8859_1)); + buffers[1] = ByteBuffer.wrap("2 howdy 2".getBytes(StandardCharsets.ISO_8859_1)); + buffers[2] = ByteBuffer.wrap("3 hey ya 3".getBytes(StandardCharsets.ISO_8859_1)); + final int totalContentBytesCount = countRemaining(buffers); + final String originalContentString = asString(buffers); + + AtomicReference- * This will return the default {@link WebSocketUpgradeFilter} on the - * provided {@link ServletContext}, creating the filter if necessary. + * Ensure a {@link WebSocketUpgradeFilter} is available on the provided {@link ServletContext}, + * a new filter will added if one does not already exist. *
** The default {@link WebSocketUpgradeFilter} is also available via * the {@link ServletContext} attribute named {@code org.eclipse.jetty.websocket.server.WebSocketUpgradeFilter} *
* - * @param servletContext the {@link ServletContext} to use - * @return the configured default {@link WebSocketUpgradeFilter} instance + * @param servletContext the {@link ServletContext} to use. + * @return the configured default {@link WebSocketUpgradeFilter} instance. */ public static FilterHolder ensureFilter(ServletContext servletContext) { @@ -132,8 +137,6 @@ public class WebSocketUpgradeFilter implements Filter, Dumpable } } - public static final String MAPPING_ATTRIBUTE_INIT_PARAM = "org.eclipse.jetty.websocket.util.server.internal.WebSocketMapping.key"; - private final Configuration.ConfigurationCustomizer defaultCustomizer = new Configuration.ConfigurationCustomizer(); private WebSocketMapping mapping; @@ -174,10 +177,9 @@ public class WebSocketUpgradeFilter implements Filter, Dumpable final ServletContext context = config.getServletContext(); String mappingKey = config.getInitParameter(MAPPING_ATTRIBUTE_INIT_PARAM); - if (mappingKey != null) - mapping = WebSocketMapping.ensureMapping(context, mappingKey); - else - mapping = new WebSocketMapping(WebSocketServerComponents.getWebSocketComponents(context)); + if (mappingKey == null) + throw new ServletException("the WebSocketMapping init param must be set"); + mapping = WebSocketMapping.ensureMapping(context, mappingKey); String max = config.getInitParameter("idleTimeout"); if (max == null) diff --git a/jetty-websocket/websocket-util-server/src/main/java/org/eclipse/jetty/websocket/util/server/internal/WebSocketMapping.java b/jetty-websocket/websocket-util-server/src/main/java/org/eclipse/jetty/websocket/util/server/internal/WebSocketMapping.java index 5bef84d466f..e93c0c4c019 100644 --- a/jetty-websocket/websocket-util-server/src/main/java/org/eclipse/jetty/websocket/util/server/internal/WebSocketMapping.java +++ b/jetty-websocket/websocket-util-server/src/main/java/org/eclipse/jetty/websocket/util/server/internal/WebSocketMapping.java @@ -63,7 +63,6 @@ public class WebSocketMapping implements Dumpable, LifeCycle.Listener public static WebSocketMapping getMapping(ServletContext servletContext, String mappingKey) { Object mappingObject = servletContext.getAttribute(mappingKey); - if (mappingObject != null) { if (mappingObject instanceof WebSocketMapping) @@ -86,7 +85,6 @@ public class WebSocketMapping implements Dumpable, LifeCycle.Listener public static WebSocketMapping ensureMapping(ServletContext servletContext, String mappingKey) { WebSocketMapping mapping = getMapping(servletContext, mappingKey); - if (mapping == null) { mapping = new WebSocketMapping(WebSocketServerComponents.getWebSocketComponents(servletContext)); @@ -135,7 +133,7 @@ public class WebSocketMapping implements Dumpable, LifeCycle.Listener throw new IllegalArgumentException("Unrecognized path spec syntax [" + rawSpec + "]"); } - public static final String DEFAULT_KEY = "org.eclipse.jetty.websocket.util.server.internal.WebSocketMapping"; + public static final String DEFAULT_KEY = "jetty.websocket.defaultMapping"; private final PathMappings