From 4ae70db8e3538dcd6c5221c43d3729f5df7bcebf Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Tue, 19 Sep 2023 13:25:41 +0200 Subject: [PATCH] fixed #10513 ContentSourceInputStream close with available (#10525) Address #10513 ContentSourceInputStream close by making it do a single read looking for EOF If any content is skipped, then it is an abnormal close. use Chunk.next in read --- .../io/content/ContentSourceInputStream.java | 40 ++++++-- .../eclipse/jetty/io/ContentSourceTest.java | 97 +++++++++++++++++++ 2 files changed, 130 insertions(+), 7 deletions(-) diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/ContentSourceInputStream.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/ContentSourceInputStream.java index c35a29828ee..d25137481cf 100644 --- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/ContentSourceInputStream.java +++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/ContentSourceInputStream.java @@ -57,7 +57,7 @@ public class ContentSourceInputStream extends InputStream if (Content.Chunk.isFailure(chunk)) { Content.Chunk c = chunk; - chunk = null; + chunk = Content.Chunk.next(c); throw IO.rethrow(c.getFailure()); } @@ -98,23 +98,49 @@ public class ContentSourceInputStream extends InputStream } @Override - public void close() + public void close() throws IOException { // If we have already reached a real EOF or a persistent failure, close is a noop. if (chunk == Content.Chunk.EOF || Content.Chunk.isFailure(chunk, true)) return; + boolean contentSkipped = false; + // If we have a chunk here, then it needs to be released if (chunk != null) { + contentSkipped = chunk.hasRemaining(); chunk.release(); - - // if the chunk was a last chunk (but not an instanceof EOF), then nothing more to do - if (chunk.isLast()) - return; + chunk = Content.Chunk.next(chunk); } - // This is an abnormal close before EOF + // If we don't have a chunk and have not skipped content, try one read looking for EOF + if (!contentSkipped && chunk == null) + { + chunk = content.read(); + + // If we read a chunk + if (chunk != null) + { + // Handle a failure as read would + if (Content.Chunk.isFailure(chunk)) + { + Content.Chunk c = chunk; + chunk = Content.Chunk.next(c); + throw IO.rethrow(c.getFailure()); + } + + contentSkipped = chunk.hasRemaining(); + chunk.release(); + chunk = Content.Chunk.next(chunk); + } + } + + // if we are now really at EOF without skipping content, then nothing more to do + if (!contentSkipped && chunk != null && chunk.isLast()) + return; + + // Otherwise this is an abnormal close before EOF Throwable closed = new IOException("closed before EOF"); chunk = Content.Chunk.from(closed); content.fail(closed); diff --git a/jetty-core/jetty-io/src/test/java/org/eclipse/jetty/io/ContentSourceTest.java b/jetty-core/jetty-io/src/test/java/org/eclipse/jetty/io/ContentSourceTest.java index 7b1868d0fc5..45799c4f1fc 100644 --- a/jetty-core/jetty-io/src/test/java/org/eclipse/jetty/io/ContentSourceTest.java +++ b/jetty-core/jetty-io/src/test/java/org/eclipse/jetty/io/ContentSourceTest.java @@ -49,6 +49,7 @@ import org.eclipse.jetty.util.IO; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; import static java.nio.charset.StandardCharsets.UTF_8; import static org.hamcrest.MatcherAssert.assertThat; @@ -511,6 +512,102 @@ public class ContentSourceTest assertThat(out.toString(UTF_8), equalTo("hello")); } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testInputStreamCloseWithAvailableEOF(boolean eofAvailable) throws Exception + { + AtomicReference failed = new AtomicReference<>(); + TestContentSource source = new TestContentSource() + { + @Override + public void fail(Throwable failure) + { + failed.set(failure); + } + }; + + InputStream in = Content.Source.asInputStream(source); + source.add("hello", false); + AtomicReference throwable = new AtomicReference<>(); + CountDownLatch complete = new CountDownLatch(1); + new Thread(() -> + { + try + { + byte[] buffer = new byte[5]; + assertThat(in.read(buffer), is(5)); + String input = new String(buffer, StandardCharsets.ISO_8859_1); + assertThat(input, is("hello")); + if (eofAvailable) + source.add(Content.Chunk.EOF); + in.close(); + } + catch (Throwable t) + { + throwable.set(t); + } + finally + { + complete.countDown(); + } + }).start(); + + Runnable todo = source.takeDemand(); + assertNull(todo); + assertTrue(complete.await(10, TimeUnit.SECONDS)); + assertNull(throwable.get()); + if (eofAvailable) + assertNull(failed.get()); + else + assertThat(failed.get(), instanceOf(IOException.class)); + } + + @Test + public void testInputStreamCloseWithContentAvailable() throws Exception + { + AtomicReference failed = new AtomicReference<>(); + TestContentSource source = new TestContentSource() + { + @Override + public void fail(Throwable failure) + { + failed.set(failure); + } + }; + + InputStream in = Content.Source.asInputStream(source); + source.add("hello", false); + AtomicReference throwable = new AtomicReference<>(); + CountDownLatch complete = new CountDownLatch(1); + new Thread(() -> + { + try + { + byte[] buffer = new byte[5]; + assertThat(in.read(buffer), is(5)); + String input = new String(buffer, StandardCharsets.ISO_8859_1); + assertThat(input, is("hello")); + source.add("extra", false); + source.add(Content.Chunk.EOF); + in.close(); + } + catch (Throwable t) + { + throwable.set(t); + } + finally + { + complete.countDown(); + } + }).start(); + + Runnable todo = source.takeDemand(); + assertNull(todo); + assertTrue(complete.await(10, TimeUnit.SECONDS)); + assertNull(throwable.get()); + assertThat(failed.get(), instanceOf(IOException.class)); + } + private static class TestContentSource implements Content.Source { private final AtomicReference _demand = new AtomicReference<>();