fixed #10513 ContentSourceInputStream close with available (#10525)

Address #10513 ContentSourceInputStream close by making it do a single read looking for EOF
If any content is skipped, then it is an abnormal close.
use Chunk.next in read
This commit is contained in:
Greg Wilkins 2023-09-19 13:25:41 +02:00 committed by GitHub
parent a3adb66a7b
commit 4ae70db8e3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 130 additions and 7 deletions

View File

@ -57,7 +57,7 @@ public class ContentSourceInputStream extends InputStream
if (Content.Chunk.isFailure(chunk)) if (Content.Chunk.isFailure(chunk))
{ {
Content.Chunk c = chunk; Content.Chunk c = chunk;
chunk = null; chunk = Content.Chunk.next(c);
throw IO.rethrow(c.getFailure()); throw IO.rethrow(c.getFailure());
} }
@ -98,23 +98,49 @@ public class ContentSourceInputStream extends InputStream
} }
@Override @Override
public void close() public void close() throws IOException
{ {
// If we have already reached a real EOF or a persistent failure, close is a noop. // If we have already reached a real EOF or a persistent failure, close is a noop.
if (chunk == Content.Chunk.EOF || Content.Chunk.isFailure(chunk, true)) if (chunk == Content.Chunk.EOF || Content.Chunk.isFailure(chunk, true))
return; return;
boolean contentSkipped = false;
// If we have a chunk here, then it needs to be released // If we have a chunk here, then it needs to be released
if (chunk != null) if (chunk != null)
{ {
contentSkipped = chunk.hasRemaining();
chunk.release(); chunk.release();
chunk = Content.Chunk.next(chunk);
// if the chunk was a last chunk (but not an instanceof EOF), then nothing more to do
if (chunk.isLast())
return;
} }
// This is an abnormal close before EOF // If we don't have a chunk and have not skipped content, try one read looking for EOF
if (!contentSkipped && chunk == null)
{
chunk = content.read();
// If we read a chunk
if (chunk != null)
{
// Handle a failure as read would
if (Content.Chunk.isFailure(chunk))
{
Content.Chunk c = chunk;
chunk = Content.Chunk.next(c);
throw IO.rethrow(c.getFailure());
}
contentSkipped = chunk.hasRemaining();
chunk.release();
chunk = Content.Chunk.next(chunk);
}
}
// if we are now really at EOF without skipping content, then nothing more to do
if (!contentSkipped && chunk != null && chunk.isLast())
return;
// Otherwise this is an abnormal close before EOF
Throwable closed = new IOException("closed before EOF"); Throwable closed = new IOException("closed before EOF");
chunk = Content.Chunk.from(closed); chunk = Content.Chunk.from(closed);
content.fail(closed); content.fail(closed);

View File

@ -49,6 +49,7 @@ import org.eclipse.jetty.util.IO;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import static java.nio.charset.StandardCharsets.UTF_8; import static java.nio.charset.StandardCharsets.UTF_8;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
@ -511,6 +512,102 @@ public class ContentSourceTest
assertThat(out.toString(UTF_8), equalTo("hello")); assertThat(out.toString(UTF_8), equalTo("hello"));
} }
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testInputStreamCloseWithAvailableEOF(boolean eofAvailable) throws Exception
{
AtomicReference<Throwable> failed = new AtomicReference<>();
TestContentSource source = new TestContentSource()
{
@Override
public void fail(Throwable failure)
{
failed.set(failure);
}
};
InputStream in = Content.Source.asInputStream(source);
source.add("hello", false);
AtomicReference<Throwable> throwable = new AtomicReference<>();
CountDownLatch complete = new CountDownLatch(1);
new Thread(() ->
{
try
{
byte[] buffer = new byte[5];
assertThat(in.read(buffer), is(5));
String input = new String(buffer, StandardCharsets.ISO_8859_1);
assertThat(input, is("hello"));
if (eofAvailable)
source.add(Content.Chunk.EOF);
in.close();
}
catch (Throwable t)
{
throwable.set(t);
}
finally
{
complete.countDown();
}
}).start();
Runnable todo = source.takeDemand();
assertNull(todo);
assertTrue(complete.await(10, TimeUnit.SECONDS));
assertNull(throwable.get());
if (eofAvailable)
assertNull(failed.get());
else
assertThat(failed.get(), instanceOf(IOException.class));
}
@Test
public void testInputStreamCloseWithContentAvailable() throws Exception
{
AtomicReference<Throwable> failed = new AtomicReference<>();
TestContentSource source = new TestContentSource()
{
@Override
public void fail(Throwable failure)
{
failed.set(failure);
}
};
InputStream in = Content.Source.asInputStream(source);
source.add("hello", false);
AtomicReference<Throwable> throwable = new AtomicReference<>();
CountDownLatch complete = new CountDownLatch(1);
new Thread(() ->
{
try
{
byte[] buffer = new byte[5];
assertThat(in.read(buffer), is(5));
String input = new String(buffer, StandardCharsets.ISO_8859_1);
assertThat(input, is("hello"));
source.add("extra", false);
source.add(Content.Chunk.EOF);
in.close();
}
catch (Throwable t)
{
throwable.set(t);
}
finally
{
complete.countDown();
}
}).start();
Runnable todo = source.takeDemand();
assertNull(todo);
assertTrue(complete.await(10, TimeUnit.SECONDS));
assertNull(throwable.get());
assertThat(failed.get(), instanceOf(IOException.class));
}
private static class TestContentSource implements Content.Source private static class TestContentSource implements Content.Source
{ {
private final AtomicReference<Runnable> _demand = new AtomicReference<>(); private final AtomicReference<Runnable> _demand = new AtomicReference<>();