diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index 426775e4af..062e515906 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -2340,18 +2340,49 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE public void exportTo(final FlowFile source, final OutputStream destination) { validateRecordState(source); final StandardRepositoryRecord record = records.get(source); + + if(record.getCurrentClaim() == null) { + return; + } + try { - if (record.getCurrentClaim() == null) { - return; + ensureNotAppending(record.getCurrentClaim()); + } catch (final IOException e) { + throw new FlowFileAccessException("Failed to access ContentClaim for " + source.toString(), e); + } + + try (final InputStream rawIn = getInputStream(source, record.getCurrentClaim(), record.getCurrentClaimOffset()); + final InputStream limitedIn = new LimitedInputStream(rawIn, source.getSize()); + final InputStream disableOnCloseIn = new DisableOnCloseInputStream(limitedIn); + final ByteCountingInputStream countingStream = new ByteCountingInputStream(disableOnCloseIn, this.bytesRead)) { + + // We want to differentiate between IOExceptions thrown by the repository and IOExceptions thrown from + // Processor code. As a result, as have the FlowFileAccessInputStream that catches IOException from the repository + // and translates into either FlowFileAccessException or ContentNotFoundException. We keep track of any + // ContentNotFoundException because if it is thrown, the Processor code may catch it and do something else with it + // but in reality, if it is thrown, we want to know about it and handle it, even if the Processor code catches it. + final FlowFileAccessInputStream ffais = new FlowFileAccessInputStream(countingStream, source, record.getCurrentClaim()); + boolean cnfeThrown = false; + + try { + recursionSet.add(source); + StreamUtils.copy(ffais, destination, source.getSize()); + } catch (final ContentNotFoundException cnfe) { + cnfeThrown = true; + throw cnfe; + } finally { + recursionSet.remove(source); + IOUtils.closeQuietly(ffais); + // if cnfeThrown is true, we don't need to re-throw the Exception; it will propagate. + if (!cnfeThrown && ffais.getContentNotFoundException() != null) { + throw ffais.getContentNotFoundException(); + } } - ensureNotAppending(record.getCurrentClaim()); - final long size = context.getContentRepository().exportTo(record.getCurrentClaim(), destination, record.getCurrentClaimOffset(), source.getSize()); - bytesRead.increment(size); } catch (final ContentNotFoundException nfe) { handleContentNotFound(nfe, record); - } catch (final Throwable t) { - throw new FlowFileAccessException("Failed to export " + source + " to " + destination + " due to " + t.toString(), t); + } catch (final IOException ex) { + throw new ProcessException("IOException thrown from " + connectableDescription + ": " + ex.toString(), ex); } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java index e418fa4f8b..4ae20802ea 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java @@ -21,8 +21,12 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.when; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.notNull; import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; @@ -324,6 +328,41 @@ public class TestStandardProcessSession { assertDisabled(outputStreamHolder.get()); } + @Test + public void testExportTo() throws IOException { + final ContentClaim claim = contentRepo.create(false); + final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() + .contentClaim(claim) + .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") + .entryDate(System.currentTimeMillis()) + .build(); + flowFileQueue.put(flowFileRecord); + FlowFile flowFile = session.get(); + assertNotNull(flowFile); + + flowFile = session.append(flowFile, new OutputStreamCallback() { + @Override + public void process(OutputStream out) throws IOException { + out.write("Hello World".getBytes()); + } + }); + + // should be OK + ByteArrayOutputStream os = new ByteArrayOutputStream(); + session.exportTo(flowFile, os); + assertEquals("Hello World", new String(os.toByteArray())); + os.close(); + + // should throw ProcessException because of IOException (from processor code) + FileOutputStream mock = Mockito.mock(FileOutputStream.class); + doThrow(new IOException()).when(mock).write((byte[]) notNull(), any(Integer.class), any(Integer.class)); + try { + session.exportTo(flowFile, mock); + Assert.fail("Expected ProcessException"); + } catch (ProcessException e) { + } + } + @Test public void testReadAfterSessionClosesStream() throws IOException { final ContentClaim claim = contentRepo.create(false);