diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java index 1ff63c545b..2d09ea5b89 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java @@ -56,6 +56,7 @@ import org.apache.nifi.controller.repository.claim.StandardContentClaimManager; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.FlowFileAccessException; import org.apache.nifi.processor.exception.MissingFlowFileException; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.InputStreamCallback; @@ -65,6 +66,8 @@ import org.apache.nifi.provenance.MockProvenanceEventRepository; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventRepository; import org.apache.nifi.provenance.ProvenanceEventType; +import org.apache.nifi.stream.io.ByteArrayOutputStream; +import org.apache.nifi.util.ObjectHolder; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -235,6 +238,105 @@ public class TestStandardProcessSession { assertEquals(0, contentRepo.getExistingClaims().size()); } + @Test(expected = FlowFileAccessException.class) + public void testAppendAfterSessionClosesStream() throws IOException { + final ContentClaim claim = contentRepo.create(false); + final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() + .contentClaim(claim) + .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") + .entryDate(System.currentTimeMillis()) + .build(); + flowFileQueue.put(flowFileRecord); + FlowFile flowFile = session.get(); + assertNotNull(flowFile); + final ObjectHolder outputStreamHolder = new ObjectHolder<>(null); + flowFile = session.append(flowFile, new OutputStreamCallback() { + @Override + public void process(final OutputStream outputStream) throws IOException { + outputStreamHolder.set(outputStream); + } + }); + try (final OutputStream outputStream = outputStreamHolder.get()) { + outputStream.write(5); + } + } + + @Test(expected = FlowFileAccessException.class) + public void testReadAfterSessionClosesStream() throws IOException { + final ContentClaim claim = contentRepo.create(false); + final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() + .contentClaim(claim) + .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") + .entryDate(System.currentTimeMillis()) + .build(); + flowFileQueue.put(flowFileRecord); + FlowFile flowFile = session.get(); + assertNotNull(flowFile); + final ObjectHolder inputStreamHolder = new ObjectHolder<>(null); + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(final InputStream inputStream) throws IOException { + inputStreamHolder.set(inputStream); + } + }); + try (final InputStream inputStream = inputStreamHolder.get()) { + inputStream.read(); + } + } + + @Test + public void testStreamAfterSessionClosesStream() throws IOException { + final ContentClaim claim = contentRepo.create(false); + final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() + .contentClaim(claim) + .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") + .entryDate(System.currentTimeMillis()) + .build(); + flowFileQueue.put(flowFileRecord); + FlowFile flowFile = session.get(); + assertNotNull(flowFile); + final ObjectHolder inputStreamHolder = new ObjectHolder<>(null); + final ObjectHolder outputStreamHolder = new ObjectHolder<>(null); + flowFile = session.write(flowFile, new StreamCallback() { + @Override + public void process(final InputStream input, final OutputStream output) throws IOException { + inputStreamHolder.set(input); + outputStreamHolder.set(output); + } + }); + try (final InputStream inputStream = inputStreamHolder.get()) { + inputStream.read(); + Assert.fail("Expected Exception to be thrown when read is attempted after session closes stream"); + } catch (final Exception ex) {} + try (final OutputStream outputStream = outputStreamHolder.get()) { + outputStream.write(5); + Assert.fail("Expected Exception to be thrown when write is attempted after session closes stream"); + } catch (final Exception ex) {} + } + + @Test(expected = FlowFileAccessException.class) + public void testWriteAfterSessionClosesStream() throws IOException { + final ContentClaim claim = contentRepo.create(false); + final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() + .contentClaim(claim) + .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") + .entryDate(System.currentTimeMillis()) + .build(); + flowFileQueue.put(flowFileRecord); + FlowFile flowFile = session.get(); + assertNotNull(flowFile); + final ObjectHolder outputStreamHolder = new ObjectHolder<>(null); + flowFile = session.write(flowFile, new OutputStreamCallback() { + @Override + public void process(final OutputStream out) throws IOException { + outputStreamHolder.set(out); + } + }); + try (final OutputStream outputStream = outputStreamHolder.get()) { + outputStream.write(5); + } + } + @Test public void testCreateThenRollbackRemovesContent() throws IOException { @@ -998,6 +1100,12 @@ public class TestStandardProcessSession { public ContentClaim create(boolean lossTolerant) throws IOException { final ContentClaim claim = claimManager.newContentClaim("container", "section", String.valueOf(idGenerator.getAndIncrement()), false); claimantCounts.put(claim, new AtomicInteger(1)); + final Path path = getPath(claim); + final Path parent = path.getParent(); + if (Files.exists(parent) == false) { + Files.createDirectories(parent); + } + Files.createFile(getPath(claim)); return claim; }