From 8e07ede5f976de11395a73dbffddfbb761a4daab Mon Sep 17 00:00:00 2001 From: Ludovic Orban Date: Fri, 3 May 2024 14:09:35 +0200 Subject: [PATCH] Fix `IllegalArgumentException: demand pending` (#11721) * Do not attempt to read from the underlying content source when there's a demand pending, i.e.: when inputState is unready * document the inputState FSM and improve the doc of its internal API Signed-off-by: Ludovic Orban Signed-off-by: Simone Bordet Co-authored-by: Simone Bordet --- .../ee10/servlet/AsyncContentProducer.java | 16 +- .../jetty/ee10/servlet/ContentProducer.java | 2 +- .../eclipse/jetty/ee10/servlet/HttpInput.java | 12 +- .../jetty/ee10/servlet/ServletChannel.java | 3 +- .../ee10/servlet/ServletChannelState.java | 107 ++++++----- .../ee10/servlet/AsyncServletIOTest.java | 179 +++++------------- 6 files changed, 129 insertions(+), 190 deletions(-) diff --git a/jetty-ee10/jetty-ee10-servlet/src/main/java/org/eclipse/jetty/ee10/servlet/AsyncContentProducer.java b/jetty-ee10/jetty-ee10-servlet/src/main/java/org/eclipse/jetty/ee10/servlet/AsyncContentProducer.java index 4be2ca17b15..3aefcdf60e6 100644 --- a/jetty-ee10/jetty-ee10-servlet/src/main/java/org/eclipse/jetty/ee10/servlet/AsyncContentProducer.java +++ b/jetty-ee10/jetty-ee10-servlet/src/main/java/org/eclipse/jetty/ee10/servlet/AsyncContentProducer.java @@ -219,9 +219,10 @@ class AsyncContentProducer implements ContentProducer assertLocked(); if (LOG.isDebugEnabled()) LOG.debug("reclaim {} {}", chunk, this); - assert chunk == _chunk; + if (chunk != _chunk) + throw new IllegalArgumentException("Cannot reclaim unknown chunk"); chunk.release(); - _chunk = null; + _chunk = Content.Chunk.next(_chunk); } @Override @@ -270,6 +271,9 @@ class AsyncContentProducer implements ContentProducer return _servletChannel.getServletRequestState().isInputUnready(); } + /** + * Never returns an empty chunk that isn't a failure and/or last. + */ private Content.Chunk produceChunk() { if (LOG.isDebugEnabled()) @@ -309,13 +313,19 @@ class AsyncContentProducer implements ContentProducer LOG.debug("channel has no new chunk {}", this); return null; } - _servletChannel.getServletRequestState().onContentAdded(); } } } private Content.Chunk readChunk() { + if (_servletChannel.getServletRequestState().isInputUnready()) + { + if (LOG.isDebugEnabled()) + LOG.debug("readChunk() in unready state, returning null {}", this); + return null; + } + Content.Chunk chunk = _servletChannel.getRequest().read(); if (chunk != null) { diff --git a/jetty-ee10/jetty-ee10-servlet/src/main/java/org/eclipse/jetty/ee10/servlet/ContentProducer.java b/jetty-ee10/jetty-ee10-servlet/src/main/java/org/eclipse/jetty/ee10/servlet/ContentProducer.java index 025fd2d88b9..6297604d50f 100644 --- a/jetty-ee10/jetty-ee10-servlet/src/main/java/org/eclipse/jetty/ee10/servlet/ContentProducer.java +++ b/jetty-ee10/jetty-ee10-servlet/src/main/java/org/eclipse/jetty/ee10/servlet/ContentProducer.java @@ -94,7 +94,7 @@ public interface ContentProducer * After this call, state can be either of UNREADY or IDLE. * * @return the next content chunk that can be read from or null if the implementation does not block - * and has no available content. + * and has no available content. The returned chunk can be empty IFF it is a failure and/or last. */ Content.Chunk nextChunk(); diff --git a/jetty-ee10/jetty-ee10-servlet/src/main/java/org/eclipse/jetty/ee10/servlet/HttpInput.java b/jetty-ee10/jetty-ee10-servlet/src/main/java/org/eclipse/jetty/ee10/servlet/HttpInput.java index 6dfff5f3388..7b5d756b0d3 100644 --- a/jetty-ee10/jetty-ee10-servlet/src/main/java/org/eclipse/jetty/ee10/servlet/HttpInput.java +++ b/jetty-ee10/jetty-ee10-servlet/src/main/java/org/eclipse/jetty/ee10/servlet/HttpInput.java @@ -188,14 +188,14 @@ public class HttpInput extends ServletInputStream implements Runnable LOG.debug("setting read listener to {} {}", readListener, this); if (_readListener != null) throw new IllegalStateException("ReadListener already set"); - //illegal if async not started + // illegal if async not started if (!_channelState.isAsyncStarted()) throw new IllegalStateException("Async not started"); _readListener = Objects.requireNonNull(readListener); _contentProducer = _asyncContentProducer; // trigger content production - if (isReady() && _channelState.onReadEof()) // onReadEof b/c we want to transition from WAITING to WOKEN + if (isReady() && _channelState.onReadListenerReady()) // onReadListenerReady b/c we want to transition from WAITING to WOKEN scheduleReadListenerNotification(); // this is needed by AsyncServletIOTest.testStolenAsyncRead } @@ -244,6 +244,8 @@ public class HttpInput extends ServletInputStream implements Runnable Content.Chunk chunk = _contentProducer.nextChunk(); if (chunk == null) throw new IllegalStateException("read on unready input"); + + // Is it not empty? if (chunk.hasRemaining()) { int read = buffer == null ? get(chunk, b, off, len) : get(chunk, buffer); @@ -254,6 +256,7 @@ public class HttpInput extends ServletInputStream implements Runnable return read; } + // Is it a failure? if (Content.Chunk.isFailure(chunk)) { Throwable failure = chunk.getFailure(); @@ -264,10 +267,11 @@ public class HttpInput extends ServletInputStream implements Runnable throw new IOException(failure); } + // Empty and not a failure; can only be EOF as per ContentProducer.nextChunk() contract. if (LOG.isDebugEnabled()) LOG.debug("read at EOF, setting consumed EOF to true {}", this); _consumedEof = true; - // If EOF do we need to wake for allDataRead callback? + // Do we need to wake for allDataRead callback? if (onContentProducible()) scheduleReadListenerNotification(); return -1; @@ -276,6 +280,8 @@ public class HttpInput extends ServletInputStream implements Runnable private void scheduleReadListenerNotification() { + if (LOG.isDebugEnabled()) + LOG.debug("scheduling ReadListener notification {}", this); _servletChannel.execute(_servletChannel::handle); } diff --git a/jetty-ee10/jetty-ee10-servlet/src/main/java/org/eclipse/jetty/ee10/servlet/ServletChannel.java b/jetty-ee10/jetty-ee10-servlet/src/main/java/org/eclipse/jetty/ee10/servlet/ServletChannel.java index 40a20921cc3..4a6ebb620d4 100644 --- a/jetty-ee10/jetty-ee10-servlet/src/main/java/org/eclipse/jetty/ee10/servlet/ServletChannel.java +++ b/jetty-ee10/jetty-ee10-servlet/src/main/java/org/eclipse/jetty/ee10/servlet/ServletChannel.java @@ -388,9 +388,10 @@ public class ServletChannel */ void recycle(Throwable x) { - _state.recycle(); + // _httpInput must be recycled before _state. _httpInput.recycle(); _httpOutput.recycle(); + _state.recycle(); _servletContextRequest = null; _request = null; _response = null; diff --git a/jetty-ee10/jetty-ee10-servlet/src/main/java/org/eclipse/jetty/ee10/servlet/ServletChannelState.java b/jetty-ee10/jetty-ee10-servlet/src/main/java/org/eclipse/jetty/ee10/servlet/ServletChannelState.java index b74dd79ff23..0c419cba559 100644 --- a/jetty-ee10/jetty-ee10-servlet/src/main/java/org/eclipse/jetty/ee10/servlet/ServletChannelState.java +++ b/jetty-ee10/jetty-ee10-servlet/src/main/java/org/eclipse/jetty/ee10/servlet/ServletChannelState.java @@ -28,6 +28,7 @@ import org.eclipse.jetty.io.QuietException; import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.Response; import org.eclipse.jetty.server.handler.ErrorHandler; +import org.eclipse.jetty.util.ExceptionUtil; import org.eclipse.jetty.util.thread.AutoLock; import org.eclipse.jetty.util.thread.Scheduler; import org.slf4j.Logger; @@ -102,12 +103,51 @@ public class ServletChannelState /* * The input readiness state. + *
+     *              read() without preceding
+     *              isReady()  ------
+     *                         \     \   unhandle() returns Action.READ_CALLBACK to call the ReadListener,
+     *                          \     \  or read() stole available content after setReadListener()
+     *                           --> IDLE <--------------
+     *     blocking read() unblocked  ^                  \
+     *                                |                   \
+     *                                |                    \  setReadListener() called while
+     *            registering demand  v                     v content is available
+     *                               UNREADY ------------> READY
+     *                                         demand
+     *                                         serviced
+     * 
*/ private enum InputState { - IDLE, // No isReady; No data - UNREADY, // isReady()==false; No data - READY // isReady() was false; data is available + /** + * The 'default' state, when there is no pending demand nor a pending notification to the ReadListener. + * There are 3 ways to transition to this state: + *
    + *
  • from IDLE: when an async read() is called without a preceding call to isReady()
  • + *
  • from READY: just before unhandle() returns Action.READ_CALLBACK to call read listener or + * when read() steals available content after setReadListener()
  • + *
  • from UNREADY: when a blocking read() got unblocked
  • + *
+ */ + IDLE, + + /** + * The 'demand registered' state. There is only 1 way to transition to this state: + *
    + *
  • from IDLE: when isReady() is called and there is no content available, so a demand is registered
  • + *
+ */ + UNREADY, + + /** + * The 'dispatch a notification to the ReadListener' state. There are 2 ways to transition to this state: + *
    + *
  • from IDLE: when setReadListener() is called while there is content available
  • + *
  • from UNREADY: when demand is serviced because content is now available
  • + *
+ */ + READY } /* @@ -1027,13 +1067,9 @@ public class ServletChannelState if (LOG.isDebugEnabled()) LOG.debug("completing {}", toStringLocked()); - switch (_requestState) - { - case COMPLETED: - throw new IllegalStateException(getStatusStringLocked()); - default: - _requestState = RequestState.COMPLETING; - } + if (_requestState == RequestState.COMPLETED) + throw new IllegalStateException(getStatusStringLocked()); + _requestState = RequestState.COMPLETING; } } @@ -1049,7 +1085,7 @@ public class ServletChannelState LOG.debug("completed {}", toStringLocked()); if (_requestState != RequestState.COMPLETING) - throw new IllegalStateException(this.getStatusStringLocked()); + failure = ExceptionUtil.combine(failure, new IllegalStateException(getStatusStringLocked())); if (failure != null) abortResponse(failure); @@ -1154,18 +1190,14 @@ public class ServletChannelState if (LOG.isDebugEnabled()) LOG.debug("upgrade {}", toStringLocked()); - switch (_state) - { - case IDLE: - break; - default: - throw new IllegalStateException(getStatusStringLocked()); - } + if (_state != State.IDLE) + throw new IllegalStateException(getStatusStringLocked()); + if (_inputState != InputState.IDLE) + throw new IllegalStateException(getStatusStringLocked()); _asyncListeners = null; _state = State.UPGRADED; _requestState = RequestState.BLOCKING; _initial = true; - _inputState = InputState.IDLE; _asyncWritePossible = false; _timeoutMs = DEFAULT_TIMEOUT; _event = null; @@ -1306,19 +1338,17 @@ public class ServletChannelState return woken; } - public boolean onReadEof() + public boolean onReadListenerReady() { boolean woken = false; try (AutoLock ignored = lock()) { if (LOG.isDebugEnabled()) - LOG.debug("onReadEof {}", toStringLocked()); + LOG.debug("onReadListenerReady {}", toStringLocked()); switch (_inputState) { case IDLE: - case READY: - case UNREADY: _inputState = InputState.READY; if (_state == State.WAITING) { @@ -1327,6 +1357,8 @@ public class ServletChannelState } break; + case READY: + case UNREADY: default: throw new IllegalStateException(toStringLocked()); } @@ -1334,31 +1366,6 @@ public class ServletChannelState return woken; } - /** - * Called to indicate that some content was produced and is - * ready for consumption. - */ - public void onContentAdded() - { - try (AutoLock ignored = lock()) - { - if (LOG.isDebugEnabled()) - LOG.debug("onContentAdded {}", toStringLocked()); - - switch (_inputState) - { - case IDLE: - case UNREADY: - case READY: - _inputState = InputState.READY; - break; - - default: - throw new IllegalStateException(toStringLocked()); - } - } - } - /** * Called to indicate that the content is being consumed. */ @@ -1398,11 +1405,11 @@ public class ServletChannelState switch (_inputState) { case IDLE: - case UNREADY: - case READY: // READY->UNREADY is needed by AsyncServletIOTest.testStolenAsyncRead _inputState = InputState.UNREADY; break; + case READY: + case UNREADY: default: throw new IllegalStateException(toStringLocked()); } diff --git a/jetty-ee10/jetty-ee10-servlet/src/test/java/org/eclipse/jetty/ee10/servlet/AsyncServletIOTest.java b/jetty-ee10/jetty-ee10-servlet/src/test/java/org/eclipse/jetty/ee10/servlet/AsyncServletIOTest.java index 5e32c1fbd1a..0bfddcf20d9 100644 --- a/jetty-ee10/jetty-ee10-servlet/src/test/java/org/eclipse/jetty/ee10/servlet/AsyncServletIOTest.java +++ b/jetty-ee10/jetty-ee10-servlet/src/test/java/org/eclipse/jetty/ee10/servlet/AsyncServletIOTest.java @@ -26,8 +26,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.UnaryOperator; import jakarta.servlet.AsyncContext; import jakarta.servlet.AsyncEvent; @@ -41,14 +39,13 @@ import jakarta.servlet.WriteListener; import jakarta.servlet.http.HttpServlet; import jakarta.servlet.http.HttpServletRequest; import jakarta.servlet.http.HttpServletResponse; +import org.eclipse.jetty.http.HttpTester; import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.HttpConfiguration; import org.eclipse.jetty.server.HttpConnectionFactory; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.util.BufferUtil; -import org.eclipse.jetty.util.thread.QueuedThreadPool; -import org.hamcrest.Matchers; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -63,6 +60,7 @@ import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.startsWith; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -76,7 +74,6 @@ public class AsyncServletIOTest protected AsyncIOServlet4 _servlet4 = new AsyncIOServlet4(); protected StolenAsyncReadServlet _servletStolenAsyncRead = new StolenAsyncReadServlet(); protected int _port; - protected WrappingQTP _wQTP; protected Server _server; protected ServletHandler _servletHandler; protected ServerConnector _connector; @@ -84,8 +81,7 @@ public class AsyncServletIOTest @BeforeEach public void setUp() throws Exception { - _wQTP = new WrappingQTP(); - _server = new Server(_wQTP); + _server = new Server(); HttpConfiguration httpConfig = new HttpConfiguration(); httpConfig.setOutputBufferSize(4096); @@ -794,125 +790,78 @@ public class AsyncServletIOTest @Test public void testStolenAsyncRead() throws Exception { - StringBuilder request = new StringBuilder(512); - request.append("POST /ctx/stolen/info HTTP/1.1\r\n") - .append("Host: localhost\r\n") - .append("Content-Type: text/plain\r\n") - .append("Content-Length: 2\r\n") - .append("\r\n") - .append("1"); - int port = _port; - try (Socket socket = new Socket("localhost", port)) + String request = """ + POST /ctx/stolen/info HTTP/1.1 + Host: localhost + Content-Type: text/plain + Content-Length: 2 + + 1"""; + + try (Socket socket = new Socket("localhost", _port)) { socket.setSoTimeout(10000); OutputStream out = socket.getOutputStream(); - out.write(request.toString().getBytes(ISO_8859_1)); + out.write(request.getBytes(ISO_8859_1)); out.flush(); - // wait until server is ready - _servletStolenAsyncRead.ready.await(); - final CountDownLatch wait = new CountDownLatch(1); - final CountDownLatch held = new CountDownLatch(1); - // Stop any dispatches until we want them - - UnaryOperator old = _wQTP.wrapper.getAndSet(r -> - () -> - { - try - { - held.countDown(); - wait.await(); - r.run(); - } - catch (InterruptedException e) - { - e.printStackTrace(); - } - } - ); - - // We are an unrelated thread, let's mess with the input stream - ServletInputStream sin = _servletStolenAsyncRead.listener.in; - sin.setReadListener(_servletStolenAsyncRead.listener); - - // thread should be dispatched to handle, but held by our wQTP wait. - assertTrue(held.await(10, TimeUnit.SECONDS)); - - // Let's steal our read - assertTrue(sin.isReady()); - assertThat(sin.read(), Matchers.is((int)'1')); - assertFalse(sin.isReady()); - - // let the ODA call go - _wQTP.wrapper.set(old); - wait.countDown(); - - // ODA should not be called + // Because the read was stolen, onDataAvailable() is not called. + // The wait guarantees that the Servlet thread is out of doPost(). assertFalse(_servletStolenAsyncRead.oda.await(500, TimeUnit.MILLISECONDS)); - // Send some more data - out.write((int)'2'); + // Send some more data. + out.write('2'); out.flush(); - // ODA should now be called!! + // onDataAvailable() should now be called. assertTrue(_servletStolenAsyncRead.oda.await(500, TimeUnit.MILLISECONDS)); - // We can not read some more - assertTrue(sin.isReady()); - assertThat(sin.read(), Matchers.is((int)'2')); + ServletInputStream in = _servletStolenAsyncRead.listener.in; - // read EOF - assertTrue(sin.isReady()); - assertThat(sin.read(), Matchers.is(-1)); + // We can now read some more. + assertTrue(in.isReady()); + assertEquals('2', in.read()); - BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream())); + // All content has been sent, must read EOF. + assertTrue(in.isReady()); + assertEquals(-1, in.read()); - // response line - String line = in.readLine(); - LOG.debug("response-line: " + line); - assertThat(line, startsWith("HTTP/1.1 200 OK")); - - // Skip headers - while (line != null) - { - line = in.readLine(); - LOG.debug("header-line: " + line); - if (line.length() == 0) - break; - } + HttpTester.Response response = HttpTester.parseResponse(socket.getInputStream()); + assertNotNull(response); + assertEquals(200, response.getStatus()); } - - assertTrue(_servletStolenAsyncRead.completed.await(5, TimeUnit.SECONDS)); } - @SuppressWarnings("serial") - public class StolenAsyncReadServlet extends HttpServlet + public static class StolenAsyncReadServlet extends HttpServlet { - public CountDownLatch ready = new CountDownLatch(1); - public CountDownLatch oda = new CountDownLatch(1); - public CountDownLatch completed = new CountDownLatch(1); - public volatile StealingListener listener; + private final CountDownLatch oda = new CountDownLatch(1); + private volatile StealingListener listener; @Override public void doPost(final HttpServletRequest request, final HttpServletResponse response) throws IOException { listener = new StealingListener(request); - ready.countDown(); + + // Steal the read. + assertEquals('1', listener.in.read()); + + // Make sure the ReadListener is called when more content is available. + assertFalse(listener.in.isReady()); + + // Exit from doPost() so that ReadListener methods can now be invoked. } - public class StealingListener implements ReadListener, AsyncListener + public class StealingListener implements ReadListener { - final HttpServletRequest request; - final ServletInputStream in; - final AsyncContext asyncContext; + private final ServletInputStream in; + private final AsyncContext asyncContext; - StealingListener(HttpServletRequest request) throws IOException + public StealingListener(HttpServletRequest request) throws IOException { asyncContext = request.startAsync(); - asyncContext.setTimeout(10000L); - asyncContext.addListener(this); - this.request = request; + asyncContext.setTimeout(0); in = request.getInputStream(); + in.setReadListener(this); } @Override @@ -922,51 +871,17 @@ public class AsyncServletIOTest } @Override - public void onAllDataRead() throws IOException + public void onAllDataRead() { asyncContext.complete(); } @Override - public void onError(final Throwable t) + public void onError(Throwable t) { t.printStackTrace(); asyncContext.complete(); } - - @Override - public void onComplete(final AsyncEvent event) - { - completed.countDown(); - } - - @Override - public void onTimeout(final AsyncEvent event) - { - asyncContext.complete(); - } - - @Override - public void onError(final AsyncEvent event) - { - asyncContext.complete(); - } - - @Override - public void onStartAsync(AsyncEvent event) - { - } - } - } - - private class WrappingQTP extends QueuedThreadPool - { - AtomicReference> wrapper = new AtomicReference<>(UnaryOperator.identity()); - - @Override - public void execute(Runnable job) - { - super.execute(wrapper.get().apply(job)); } } }