mirror of
https://github.com/apache/nifi.git
synced 2025-02-10 03:55:22 +00:00
NIFI-5200: Fixed issue with InputStream being closed when calling ProcessSession.read() twice against sequential Content Claims
Signed-off-by: Matthew Burgess <mattyb149@apache.org> This closes #2753
This commit is contained in:
parent
4bccab7e05
commit
00a63d17af
@ -2141,14 +2141,14 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
|
|||||||
|
|
||||||
currentReadClaim = claim;
|
currentReadClaim = claim;
|
||||||
|
|
||||||
// Use a non-closeable stream because we want to keep it open after the callback has finished so that we can
|
currentReadClaimStream = new ByteCountingInputStream(rawInStream);
|
||||||
// reuse the same InputStream for the next FlowFile
|
|
||||||
final InputStream disableOnClose = new DisableOnCloseInputStream(rawInStream);
|
|
||||||
|
|
||||||
currentReadClaimStream = new ByteCountingInputStream(disableOnClose);
|
|
||||||
StreamUtils.skip(currentReadClaimStream, offset);
|
StreamUtils.skip(currentReadClaimStream, offset);
|
||||||
|
|
||||||
return currentReadClaimStream;
|
// Use a non-closeable stream because we want to keep it open after the callback has finished so that we can
|
||||||
|
// reuse the same InputStream for the next FlowFile
|
||||||
|
final InputStream disableOnClose = new DisableOnCloseInputStream(currentReadClaimStream);
|
||||||
|
|
||||||
|
return disableOnClose;
|
||||||
} else {
|
} else {
|
||||||
claimCache.flush(claim);
|
claimCache.flush(claim);
|
||||||
final InputStream rawInStream = context.getContentRepository().read(claim);
|
final InputStream rawInStream = context.getContentRepository().read(claim);
|
||||||
|
@ -349,6 +349,25 @@ public class TestStandardProcessSession {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSequentialReads() throws IOException {
|
||||||
|
FlowFile ff1 = session.write(session.create(), out -> out.write(new byte[] {'A', 'B'}));
|
||||||
|
FlowFile ff2 = session.write(session.create(), out -> out.write('C'));
|
||||||
|
|
||||||
|
final byte[] buff1 = new byte[2];
|
||||||
|
try (final InputStream in = session.read(ff1)) {
|
||||||
|
StreamUtils.fillBuffer(in, buff1);
|
||||||
|
}
|
||||||
|
|
||||||
|
final byte[] buff2 = new byte[1];
|
||||||
|
try (final InputStream in = session.read(ff2)) {
|
||||||
|
StreamUtils.fillBuffer(in, buff2);
|
||||||
|
}
|
||||||
|
|
||||||
|
Assert.assertArrayEquals(new byte[] {'A', 'B'}, buff1);
|
||||||
|
Assert.assertArrayEquals(new byte[] {'C'}, buff2);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCloneOriginalDataLarger() throws IOException {
|
public void testCloneOriginalDataLarger() throws IOException {
|
||||||
final byte[] originalContent = "hello there 12345".getBytes();
|
final byte[] originalContent = "hello there 12345".getBytes();
|
||||||
|
Loading…
x
Reference in New Issue
Block a user