From 00a63d17af3c82727b9119acb00fccfcf6639fc5 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Fri, 1 Jun 2018 11:14:56 -0400 Subject: [PATCH] NIFI-5200: Fixed issue with InputStream being closed when calling ProcessSession.read() twice against sequential Content Claims Signed-off-by: Matthew Burgess This closes #2753 --- .../repository/StandardProcessSession.java | 12 ++++++------ .../TestStandardProcessSession.java | 19 +++++++++++++++++++ 2 files changed, 25 insertions(+), 6 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index 2750db61e7..12bcafdd2e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -2141,14 +2141,14 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE currentReadClaim = claim; - // Use a non-closeable stream because we want to keep it open after the callback has finished so that we can - // reuse the same InputStream for the next FlowFile - final InputStream disableOnClose = new DisableOnCloseInputStream(rawInStream); - - currentReadClaimStream = new ByteCountingInputStream(disableOnClose); + currentReadClaimStream = new ByteCountingInputStream(rawInStream); StreamUtils.skip(currentReadClaimStream, offset); - return currentReadClaimStream; + // Use a non-closeable stream because we want to keep it open after the callback has finished so that we can + // reuse the same InputStream for the next FlowFile + final InputStream disableOnClose = new DisableOnCloseInputStream(currentReadClaimStream); + + return disableOnClose; } else { claimCache.flush(claim); final InputStream rawInStream = context.getContentRepository().read(claim); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java index 55fa23210a..f47be4d922 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java @@ -349,6 +349,25 @@ public class TestStandardProcessSession { }); } + @Test + public void testSequentialReads() throws IOException { + FlowFile ff1 = session.write(session.create(), out -> out.write(new byte[] {'A', 'B'})); + FlowFile ff2 = session.write(session.create(), out -> out.write('C')); + + final byte[] buff1 = new byte[2]; + try (final InputStream in = session.read(ff1)) { + StreamUtils.fillBuffer(in, buff1); + } + + final byte[] buff2 = new byte[1]; + try (final InputStream in = session.read(ff2)) { + StreamUtils.fillBuffer(in, buff2); + } + + Assert.assertArrayEquals(new byte[] {'A', 'B'}, buff1); + Assert.assertArrayEquals(new byte[] {'C'}, buff2); + } + @Test public void testCloneOriginalDataLarger() throws IOException { final byte[] originalContent = "hello there 12345".getBytes();