NIFI-11232 Fixed buffer handling in ContentClaimInputStream

This closes #6996

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Christian Wahl 2023-02-28 18:53:38 +01:00 committed by exceptionfactory
parent fe2721786c
commit 6bd893da16
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
2 changed files with 136 additions and 44 deletions

View File

@ -25,6 +25,7 @@ import org.apache.nifi.stream.io.StreamUtils;
import java.io.BufferedInputStream; import java.io.BufferedInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.UncheckedIOException;
/** /**
* An InputStream that is provided a Content Repository, Content Claim, and offset into the Content Claim where a FlowFile's * An InputStream that is provided a Content Repository, Content Claim, and offset into the Content Claim where a FlowFile's
@ -81,14 +82,13 @@ public class ContentClaimInputStream extends InputStream {
@Override @Override
public int read() throws IOException { public int read() throws IOException {
int value = -1; final int value;
if (bufferedIn != null) { if (bufferedIn == null) {
value = getDelegate().read();
} else {
value = bufferedIn.read(); value = bufferedIn.read();
} }
if (value < 0) {
value = getDelegate().read();
}
if (value != -1) { if (value != -1) {
bytesConsumed++; bytesConsumed++;
currentOffset++; currentOffset++;
@ -99,12 +99,11 @@ public class ContentClaimInputStream extends InputStream {
@Override @Override
public int read(final byte[] b) throws IOException { public int read(final byte[] b) throws IOException {
int count = -1; final int count;
if (bufferedIn != null) { if (bufferedIn == null) {
count = bufferedIn.read(b);
}
if (count < 0) {
count = getDelegate().read(b); count = getDelegate().read(b);
} else {
count = bufferedIn.read(b);
} }
if (count != -1) { if (count != -1) {
@ -117,12 +116,11 @@ public class ContentClaimInputStream extends InputStream {
@Override @Override
public int read(final byte[] b, final int off, final int len) throws IOException { public int read(final byte[] b, final int off, final int len) throws IOException {
int count = -1; final int count;
if (bufferedIn != null) { if (bufferedIn == null) {
count = bufferedIn.read(b, off, len);
}
if (count < 0) {
count = getDelegate().read(b, off, len); count = getDelegate().read(b, off, len);
} else {
count = bufferedIn.read(b, off, len);
} }
if (count != -1) { if (count != -1) {
@ -158,27 +156,48 @@ public class ContentClaimInputStream extends InputStream {
return true; return true;
} }
/**
* Marks the current position. Can be returned to with {@code reset()}.
*
* @param readLimit hint on how much data should be buffered.
* @see ContentClaimInputStream#reset()
*/
@Override @Override
public void mark(final int readlimit) { public void mark(final int readLimit) {
markOffset = currentOffset; markOffset = currentOffset;
markReadLimit = readlimit; markReadLimit = readLimit;
if (bufferedIn != null) { if (bufferedIn == null) {
bufferedIn.mark(readlimit); try {
bufferedIn = new BufferedInputStream(getDelegate());
} catch (final IOException e) {
throw new UncheckedIOException("Failed to read repository Content Claim", e);
} }
} }
bufferedIn.mark(readLimit);
}
/**
* Resets to the last marked position.
*
* @throws IOException Thrown when a mark position is not set or on other stream handling failures
* @see ContentClaimInputStream#mark(int)
*/
@Override @Override
public void reset() throws IOException { public void reset() throws IOException {
if (markOffset < 0) { if (markOffset < 0) {
throw new IOException("Stream has not been marked"); throw new IOException("Stream has not been marked");
} }
if (bufferedIn != null && bytesConsumed <= markReadLimit) { if (bufferedIn != null) {
if ((currentOffset - markOffset) <= markReadLimit) {
bufferedIn.reset(); bufferedIn.reset();
currentOffset = markOffset; currentOffset = markOffset;
return; return;
} }
// we read over the limit and need to throw away the buffer
bufferedIn = null;
}
if (currentOffset != markOffset) { if (currentOffset != markOffset) {
if (delegate != null) { if (delegate != null) {
@ -200,6 +219,10 @@ public class ContentClaimInputStream extends InputStream {
@Override @Override
public void close() throws IOException { public void close() throws IOException {
if (bufferedIn != null) {
bufferedIn.close();
}
if (delegate != null) { if (delegate != null) {
delegate.close(); delegate.close();
} }
@ -215,15 +238,6 @@ public class ContentClaimInputStream extends InputStream {
delegate = new PerformanceTrackingInputStream(contentRepository.read(contentClaim), performanceTracker); delegate = new PerformanceTrackingInputStream(contentRepository.read(contentClaim), performanceTracker);
StreamUtils.skip(delegate, claimOffset); StreamUtils.skip(delegate, claimOffset);
currentOffset = claimOffset; currentOffset = claimOffset;
if (markReadLimit > 0) {
final int limitLeft = (int) (markReadLimit - currentOffset);
if (limitLeft > 0) {
final InputStream limitedIn = new LimitedInputStream(delegate, limitLeft);
bufferedIn = new BufferedInputStream(limitedIn);
bufferedIn.mark(limitLeft);
}
}
} finally { } finally {
performanceTracker.endContentRead(); performanceTracker.endContentRead();
} }

View File

@ -24,6 +24,7 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.mockito.Mockito; import org.mockito.Mockito;
import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -62,7 +63,7 @@ public class TestContentClaimInputStream {
final byte[] buff = new byte[5]; final byte[] buff = new byte[5];
StreamUtils.fillBuffer(in, buff); StreamUtils.fillBuffer(in, buff);
Mockito.verify(repo, Mockito.times(1)).read(contentClaim); Mockito.verify(repo).read(contentClaim);
Mockito.verifyNoMoreInteractions(repo); Mockito.verifyNoMoreInteractions(repo);
final String contentRead = new String(buff); final String contentRead = new String(buff);
@ -84,7 +85,7 @@ public class TestContentClaimInputStream {
final byte[] buff = new byte[2]; final byte[] buff = new byte[2];
StreamUtils.fillBuffer(in, buff); StreamUtils.fillBuffer(in, buff);
Mockito.verify(repo, Mockito.times(1)).read(contentClaim); Mockito.verify(repo).read(contentClaim);
Mockito.verifyNoMoreInteractions(repo); Mockito.verifyNoMoreInteractions(repo);
final String contentRead = new String(buff); final String contentRead = new String(buff);
@ -121,7 +122,7 @@ public class TestContentClaimInputStream {
in.reset(); in.reset();
} }
Mockito.verify(repo, Mockito.times(invocations + 1)).read(contentClaim); // Will call reset() 'invocations' times plus the initial read Mockito.verify(repo).read(contentClaim);
Mockito.verifyNoMoreInteractions(repo); Mockito.verifyNoMoreInteractions(repo);
// Ensure that underlying stream is closed // Ensure that underlying stream is closed
@ -152,7 +153,7 @@ public class TestContentClaimInputStream {
in.reset(); in.reset();
} }
Mockito.verify(repo, Mockito.times(invocations + 1)).read(contentClaim); // Will call reset() 'invocations' times plus the initial read Mockito.verify(repo).read(contentClaim);
Mockito.verifyNoMoreInteractions(repo); Mockito.verifyNoMoreInteractions(repo);
// Ensure that underlying stream is closed // Ensure that underlying stream is closed
@ -183,11 +184,88 @@ public class TestContentClaimInputStream {
in.reset(); in.reset();
} }
Mockito.verify(repo, Mockito.times(invocations + 1)).read(contentClaim); // Will call reset() 'invocations' times plus the initial read Mockito.verify(repo).read(contentClaim);
Mockito.verifyNoMoreInteractions(repo); Mockito.verifyNoMoreInteractions(repo);
// Ensure that underlying stream is closed // Ensure that underlying stream is closed
in.close(); in.close();
assertTrue(closed.get()); assertTrue(closed.get());
} }
@Test
public void testRereadBiggerThanBuffer() throws IOException {
final ContentClaimInputStream in = new ContentClaimInputStream(repo, contentClaim, 0L, new NopPerformanceTracker());
final byte[] buff = new byte[5];
final int invocations = 10;
in.mark(2);
for (int i = 0; i < invocations; i++) {
StreamUtils.fillBuffer(in, buff, true);
final String contentRead = new String(buff);
assertEquals("hello", contentRead);
assertEquals(5 * (i + 1), in.getBytesConsumed());
assertEquals(5, in.getCurrentOffset());
assertEquals(-1, in.read());
in.reset();
}
Mockito.verify(repo, Mockito.times(invocations + 1)).read(contentClaim);
Mockito.verifyNoMoreInteractions(repo);
// Ensure that underlying stream is closed
in.close();
assertTrue(closed.get());
}
@Test
public void testBigReadAfterSmallRereads() throws IOException {
// this has to be bigger than the default buffer size of BufferedInputStream
final int bigReadSize = 65_000;
final byte[] source = new byte[bigReadSize];
Mockito.when(repo.read(contentClaim)).thenAnswer(invocation -> {
ByteArrayInputStream is = new ByteArrayInputStream(source) {
@Override
public void close() throws IOException {
super.close();
closed.set(true);
}
};
// wrap it, because ByteArrayInputStream throws no exception when being read after a close
return new BufferedInputStream(is);
});
final ContentClaimInputStream in = new ContentClaimInputStream(repo, contentClaim, 100L, repo.read(contentClaim), new NopPerformanceTracker());
int invocations = 5;
for (int i = 0; i < invocations; i++) {
in.mark(1);
in.read();
assertEquals(i + 1, in.getBytesConsumed());
assertEquals(101, in.getCurrentOffset());
in.reset();
}
byte[] buff = new byte[bigReadSize];
// Force the buffer to read from the delegate stream by reading all the data and therefore
// going over the default buffer size.
in.read(buff);
in.reset();
Mockito.verify(repo, Mockito.times(2)).read(contentClaim);
Mockito.verifyNoMoreInteractions(repo);
in.close();
assertTrue(closed.get());
}
} }