NIFI-1866 ProcessException handling in StandardProcessSession

This commit is contained in:
Pierre Villard 2016-05-12 23:31:45 +02:00 committed by Mark Payne
parent 8a447eec66
commit 9e705a8468
2 changed files with 77 additions and 7 deletions

View File

@ -2340,18 +2340,49 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
public void exportTo(final FlowFile source, final OutputStream destination) {
validateRecordState(source);
final StandardRepositoryRecord record = records.get(source);
try {
if (record.getCurrentClaim() == null) {
if(record.getCurrentClaim() == null) {
return;
}
try {
ensureNotAppending(record.getCurrentClaim());
final long size = context.getContentRepository().exportTo(record.getCurrentClaim(), destination, record.getCurrentClaimOffset(), source.getSize());
bytesRead.increment(size);
} catch (final IOException e) {
throw new FlowFileAccessException("Failed to access ContentClaim for " + source.toString(), e);
}
try (final InputStream rawIn = getInputStream(source, record.getCurrentClaim(), record.getCurrentClaimOffset());
final InputStream limitedIn = new LimitedInputStream(rawIn, source.getSize());
final InputStream disableOnCloseIn = new DisableOnCloseInputStream(limitedIn);
final ByteCountingInputStream countingStream = new ByteCountingInputStream(disableOnCloseIn, this.bytesRead)) {
// We want to differentiate between IOExceptions thrown by the repository and IOExceptions thrown from
// Processor code. As a result, as have the FlowFileAccessInputStream that catches IOException from the repository
// and translates into either FlowFileAccessException or ContentNotFoundException. We keep track of any
// ContentNotFoundException because if it is thrown, the Processor code may catch it and do something else with it
// but in reality, if it is thrown, we want to know about it and handle it, even if the Processor code catches it.
final FlowFileAccessInputStream ffais = new FlowFileAccessInputStream(countingStream, source, record.getCurrentClaim());
boolean cnfeThrown = false;
try {
recursionSet.add(source);
StreamUtils.copy(ffais, destination, source.getSize());
} catch (final ContentNotFoundException cnfe) {
cnfeThrown = true;
throw cnfe;
} finally {
recursionSet.remove(source);
IOUtils.closeQuietly(ffais);
// if cnfeThrown is true, we don't need to re-throw the Exception; it will propagate.
if (!cnfeThrown && ffais.getContentNotFoundException() != null) {
throw ffais.getContentNotFoundException();
}
}
} catch (final ContentNotFoundException nfe) {
handleContentNotFound(nfe, record);
} catch (final Throwable t) {
throw new FlowFileAccessException("Failed to export " + source + " to " + destination + " due to " + t.toString(), t);
} catch (final IOException ex) {
throw new ProcessException("IOException thrown from " + connectableDescription + ": " + ex.toString(), ex);
}
}

View File

@ -21,8 +21,12 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.notNull;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
@ -324,6 +328,41 @@ public class TestStandardProcessSession {
assertDisabled(outputStreamHolder.get());
}
@Test
public void testExportTo() throws IOException {
final ContentClaim claim = contentRepo.create(false);
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
.contentClaim(claim)
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.entryDate(System.currentTimeMillis())
.build();
flowFileQueue.put(flowFileRecord);
FlowFile flowFile = session.get();
assertNotNull(flowFile);
flowFile = session.append(flowFile, new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
out.write("Hello World".getBytes());
}
});
// should be OK
ByteArrayOutputStream os = new ByteArrayOutputStream();
session.exportTo(flowFile, os);
assertEquals("Hello World", new String(os.toByteArray()));
os.close();
// should throw ProcessException because of IOException (from processor code)
FileOutputStream mock = Mockito.mock(FileOutputStream.class);
doThrow(new IOException()).when(mock).write((byte[]) notNull(), any(Integer.class), any(Integer.class));
try {
session.exportTo(flowFile, mock);
Assert.fail("Expected ProcessException");
} catch (ProcessException e) {
}
}
@Test
public void testReadAfterSessionClosesStream() throws IOException {
final ContentClaim claim = contentRepo.create(false);