From 06b00931308c2f3ee6d433950828e09089bc9100 Mon Sep 17 00:00:00 2001 From: Christian Wahl Date: Tue, 28 Feb 2023 18:53:38 +0100 Subject: [PATCH] NIFI-11232 Fixed buffer handling in ContentClaimInputStream This closes #6996 Signed-off-by: David Handermann --- .../io/ContentClaimInputStream.java | 80 ++++++++------ .../io/TestContentClaimInputStream.java | 100 ++++++++++++++++-- 2 files changed, 136 insertions(+), 44 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/io/ContentClaimInputStream.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/io/ContentClaimInputStream.java index 3ace1ac318..911580991f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/io/ContentClaimInputStream.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/io/ContentClaimInputStream.java @@ -25,6 +25,7 @@ import org.apache.nifi.stream.io.StreamUtils; import java.io.BufferedInputStream; import java.io.IOException; import java.io.InputStream; +import java.io.UncheckedIOException; /** * An InputStream that is provided a Content Repository, Content Claim, and offset into the Content Claim where a FlowFile's @@ -81,14 +82,13 @@ public class ContentClaimInputStream extends InputStream { @Override public int read() throws IOException { - int value = -1; - if (bufferedIn != null) { + final int value; + if (bufferedIn == null) { + value = getDelegate().read(); + } else { value = bufferedIn.read(); } - if (value < 0) { - value = getDelegate().read(); - } if (value != -1) { bytesConsumed++; currentOffset++; @@ -99,12 +99,11 @@ public class ContentClaimInputStream extends InputStream { @Override public int read(final byte[] b) throws IOException { - int count = -1; - if (bufferedIn != null) { - count = bufferedIn.read(b); - } - if (count < 0) { + final int count; + if (bufferedIn == null) { count = getDelegate().read(b); + } else { + count = bufferedIn.read(b); } if (count != -1) { @@ -117,12 +116,11 @@ public class ContentClaimInputStream extends InputStream { @Override public int read(final byte[] b, final int off, final int len) throws IOException { - int count = -1; - if (bufferedIn != null) { - count = bufferedIn.read(b, off, len); - } - if (count < 0) { + final int count; + if (bufferedIn == null) { count = getDelegate().read(b, off, len); + } else { + count = bufferedIn.read(b, off, len); } if (count != -1) { @@ -158,26 +156,47 @@ public class ContentClaimInputStream extends InputStream { return true; } + /** + * Marks the current position. Can be returned to with {@code reset()}. + * + * @param readLimit hint on how much data should be buffered. + * @see ContentClaimInputStream#reset() + */ @Override - public void mark(final int readlimit) { + public void mark(final int readLimit) { markOffset = currentOffset; - markReadLimit = readlimit; - if (bufferedIn != null) { - bufferedIn.mark(readlimit); + markReadLimit = readLimit; + if (bufferedIn == null) { + try { + bufferedIn = new BufferedInputStream(getDelegate()); + } catch (final IOException e) { + throw new UncheckedIOException("Failed to read repository Content Claim", e); + } } + + bufferedIn.mark(readLimit); } + /** + * Resets to the last marked position. + * + * @throws IOException Thrown when a mark position is not set or on other stream handling failures + * @see ContentClaimInputStream#mark(int) + */ @Override public void reset() throws IOException { if (markOffset < 0) { throw new IOException("Stream has not been marked"); } - if (bufferedIn != null && bytesConsumed <= markReadLimit) { - bufferedIn.reset(); - currentOffset = markOffset; - - return; + if (bufferedIn != null) { + if ((currentOffset - markOffset) <= markReadLimit) { + bufferedIn.reset(); + currentOffset = markOffset; + return; + } + // we read over the limit and need to throw away the buffer + bufferedIn = null; } if (currentOffset != markOffset) { @@ -200,6 +219,10 @@ public class ContentClaimInputStream extends InputStream { @Override public void close() throws IOException { + if (bufferedIn != null) { + bufferedIn.close(); + } + if (delegate != null) { delegate.close(); } @@ -215,15 +238,6 @@ public class ContentClaimInputStream extends InputStream { delegate = new PerformanceTrackingInputStream(contentRepository.read(contentClaim), performanceTracker); StreamUtils.skip(delegate, claimOffset); currentOffset = claimOffset; - - if (markReadLimit > 0) { - final int limitLeft = (int) (markReadLimit - currentOffset); - if (limitLeft > 0) { - final InputStream limitedIn = new LimitedInputStream(delegate, limitLeft); - bufferedIn = new BufferedInputStream(limitedIn); - bufferedIn.mark(limitLeft); - } - } } finally { performanceTracker.endContentRead(); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/io/TestContentClaimInputStream.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/io/TestContentClaimInputStream.java index fe4f6b9e99..a250b9b7f5 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/io/TestContentClaimInputStream.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/io/TestContentClaimInputStream.java @@ -24,6 +24,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.Mockito; +import java.io.BufferedInputStream; import java.io.ByteArrayInputStream; import java.io.IOException; import java.util.concurrent.atomic.AtomicBoolean; @@ -62,7 +63,7 @@ public class TestContentClaimInputStream { final byte[] buff = new byte[5]; StreamUtils.fillBuffer(in, buff); - Mockito.verify(repo, Mockito.times(1)).read(contentClaim); + Mockito.verify(repo).read(contentClaim); Mockito.verifyNoMoreInteractions(repo); final String contentRead = new String(buff); @@ -84,7 +85,7 @@ public class TestContentClaimInputStream { final byte[] buff = new byte[2]; StreamUtils.fillBuffer(in, buff); - Mockito.verify(repo, Mockito.times(1)).read(contentClaim); + Mockito.verify(repo).read(contentClaim); Mockito.verifyNoMoreInteractions(repo); final String contentRead = new String(buff); @@ -106,7 +107,7 @@ public class TestContentClaimInputStream { final byte[] buff = new byte[5]; final int invocations = 10; - for (int i=0; i < invocations; i++) { + for (int i = 0; i < invocations; i++) { in.mark(5); StreamUtils.fillBuffer(in, buff, true); @@ -114,14 +115,14 @@ public class TestContentClaimInputStream { final String contentRead = new String(buff); assertEquals("hello", contentRead); - assertEquals(5 * (i+1), in.getBytesConsumed()); + assertEquals(5 * (i + 1), in.getBytesConsumed()); assertEquals(5, in.getCurrentOffset()); assertEquals(-1, in.read()); in.reset(); } - Mockito.verify(repo, Mockito.times(invocations + 1)).read(contentClaim); // Will call reset() 'invocations' times plus the initial read + Mockito.verify(repo).read(contentClaim); Mockito.verifyNoMoreInteractions(repo); // Ensure that underlying stream is closed @@ -139,20 +140,20 @@ public class TestContentClaimInputStream { final int invocations = 10; in.mark(5); - for (int i=0; i < invocations; i++) { + for (int i = 0; i < invocations; i++) { StreamUtils.fillBuffer(in, buff, true); final String contentRead = new String(buff); assertEquals("hello", contentRead); - assertEquals(5 * (i+1), in.getBytesConsumed()); + assertEquals(5 * (i + 1), in.getBytesConsumed()); assertEquals(5, in.getCurrentOffset()); assertEquals(-1, in.read()); in.reset(); } - Mockito.verify(repo, Mockito.times(invocations + 1)).read(contentClaim); // Will call reset() 'invocations' times plus the initial read + Mockito.verify(repo).read(contentClaim); Mockito.verifyNoMoreInteractions(repo); // Ensure that underlying stream is closed @@ -168,7 +169,7 @@ public class TestContentClaimInputStream { final byte[] buff = new byte[2]; final int invocations = 10; - for (int i=0; i < invocations; i++) { + for (int i = 0; i < invocations; i++) { in.mark(5); StreamUtils.fillBuffer(in, buff, true); @@ -176,18 +177,95 @@ public class TestContentClaimInputStream { final String contentRead = new String(buff); assertEquals("lo", contentRead); - assertEquals(2 * (i+1), in.getBytesConsumed()); + assertEquals(2 * (i + 1), in.getBytesConsumed()); assertEquals(5, in.getCurrentOffset()); assertEquals(-1, in.read()); in.reset(); } - Mockito.verify(repo, Mockito.times(invocations + 1)).read(contentClaim); // Will call reset() 'invocations' times plus the initial read + Mockito.verify(repo).read(contentClaim); Mockito.verifyNoMoreInteractions(repo); // Ensure that underlying stream is closed in.close(); assertTrue(closed.get()); } + + + @Test + public void testRereadBiggerThanBuffer() throws IOException { + final ContentClaimInputStream in = new ContentClaimInputStream(repo, contentClaim, 0L, new NopPerformanceTracker()); + + final byte[] buff = new byte[5]; + + final int invocations = 10; + in.mark(2); + + for (int i = 0; i < invocations; i++) { + StreamUtils.fillBuffer(in, buff, true); + + final String contentRead = new String(buff); + assertEquals("hello", contentRead); + + assertEquals(5 * (i + 1), in.getBytesConsumed()); + assertEquals(5, in.getCurrentOffset()); + assertEquals(-1, in.read()); + + in.reset(); + } + + Mockito.verify(repo, Mockito.times(invocations + 1)).read(contentClaim); + Mockito.verifyNoMoreInteractions(repo); + + // Ensure that underlying stream is closed + in.close(); + assertTrue(closed.get()); + } + + + @Test + public void testBigReadAfterSmallRereads() throws IOException { + // this has to be bigger than the default buffer size of BufferedInputStream + final int bigReadSize = 65_000; + + final byte[] source = new byte[bigReadSize]; + + Mockito.when(repo.read(contentClaim)).thenAnswer(invocation -> { + ByteArrayInputStream is = new ByteArrayInputStream(source) { + @Override + public void close() throws IOException { + super.close(); + closed.set(true); + } + }; + // wrap it, because ByteArrayInputStream throws no exception when being read after a close + return new BufferedInputStream(is); + }); + + final ContentClaimInputStream in = new ContentClaimInputStream(repo, contentClaim, 100L, repo.read(contentClaim), new NopPerformanceTracker()); + + int invocations = 5; + for (int i = 0; i < invocations; i++) { + in.mark(1); + in.read(); + + assertEquals(i + 1, in.getBytesConsumed()); + assertEquals(101, in.getCurrentOffset()); + + in.reset(); + } + + byte[] buff = new byte[bigReadSize]; + // Force the buffer to read from the delegate stream by reading all the data and therefore + // going over the default buffer size. + in.read(buff); + in.reset(); + + Mockito.verify(repo, Mockito.times(2)).read(contentClaim); + Mockito.verifyNoMoreInteractions(repo); + + in.close(); + assertTrue(closed.get()); + } }