NIFI-516 adding option to StandardProcessSession.read to close stream

Signed-off-by: Mark Payne <markap14@hotmail.com>
This commit is contained in:
Joseph Percivall 2015-10-07 10:50:07 -04:00 committed by Mark Payne
parent 1c1738670c
commit b885f955f4
6 changed files with 95 additions and 8 deletions

View File

@ -508,6 +508,33 @@ public interface ProcessSession {
*/ */
void read(FlowFile source, InputStreamCallback reader) throws FlowFileAccessException; void read(FlowFile source, InputStreamCallback reader) throws FlowFileAccessException;
/**
* Executes the given callback against the contents corresponding to the
* given FlowFile.
*
* <i>Note</i>: The OutputStream provided to the given OutputStreamCallback
* will not be accessible once this method has completed its execution.
*
* @param source flowfile to retrieve content of
* @param allowSessionStreamManagement allow session to hold the stream open for performance reasons
* @param reader that will be called to read the flowfile content
* @throws IllegalStateException if detected that this method is being
* called from within a callback of another method in this session and for
* the given FlowFile(s)
* @throws FlowFileHandlingException if the given FlowFile is already
* transferred or removed or doesn't belong to this session. Automatic
* rollback will occur.
* @throws MissingFlowFileException if the given FlowFile content cannot be
* found. The FlowFile should no longer be reference, will be internally
* destroyed, and the session is automatically rolled back and what is left
* of the FlowFile is destroyed.
* @throws FlowFileAccessException if some IO problem occurs accessing
* FlowFile content; if an attempt is made to access the InputStream
* provided to the given InputStreamCallback after this method completed its
* execution
*/
void read(FlowFile source, boolean allowSessionStreamManagement, InputStreamCallback reader) throws FlowFileAccessException;
/** /**
* Combines the content of all given source FlowFiles into a single given * Combines the content of all given source FlowFiles into a single given
* destination FlowFile. * destination FlowFile.

View File

@ -400,6 +400,11 @@ public class MockProcessSession implements ProcessSession {
@Override @Override
public void read(final FlowFile flowFile, final InputStreamCallback callback) { public void read(final FlowFile flowFile, final InputStreamCallback callback) {
read(flowFile, false, callback);
}
@Override
public void read(final FlowFile flowFile, boolean allowSessionStreamManagement, final InputStreamCallback callback) {
if (callback == null || flowFile == null) { if (callback == null || flowFile == null) {
throw new IllegalArgumentException("argument cannot be null"); throw new IllegalArgumentException("argument cannot be null");
} }
@ -413,6 +418,9 @@ public class MockProcessSession implements ProcessSession {
final ByteArrayInputStream bais = new ByteArrayInputStream(mock.getData()); final ByteArrayInputStream bais = new ByteArrayInputStream(mock.getData());
try { try {
callback.process(bais); callback.process(bais);
if(!allowSessionStreamManagement){
bais.close();
}
} catch (final IOException e) { } catch (final IOException e) {
throw new ProcessException(e.toString(), e); throw new ProcessException(e.toString(), e);
} }

View File

@ -187,6 +187,11 @@ public class BatchingSessionFactory implements ProcessSessionFactory {
session.read(source, reader); session.read(source, reader);
} }
@Override
public void read(FlowFile source, boolean allowSessionStreamManagement, InputStreamCallback reader) {
session.read(source, allowSessionStreamManagement, reader);
}
@Override @Override
public FlowFile merge(Collection<FlowFile> sources, FlowFile destination) { public FlowFile merge(Collection<FlowFile> sources, FlowFile destination) {
return session.merge(sources, destination); return session.merge(sources, destination);

View File

@ -1770,6 +1770,11 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
@Override @Override
public void read(final FlowFile source, final InputStreamCallback reader) { public void read(final FlowFile source, final InputStreamCallback reader) {
read(source, false, reader);
}
@Override
public void read(FlowFile source, boolean allowSessionStreamManagement, InputStreamCallback reader) {
validateRecordState(source); validateRecordState(source);
final StandardRepositoryRecord record = records.get(source); final StandardRepositoryRecord record = records.get(source);
@ -1780,9 +1785,9 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
} }
try (final InputStream rawIn = getInputStream(source, record.getCurrentClaim(), record.getCurrentClaimOffset()); try (final InputStream rawIn = getInputStream(source, record.getCurrentClaim(), record.getCurrentClaimOffset());
final InputStream limitedIn = new LimitedInputStream(rawIn, source.getSize()); final InputStream limitedIn = new LimitedInputStream(rawIn, source.getSize());
final InputStream disableOnCloseIn = new DisableOnCloseInputStream(limitedIn); final InputStream disableOnCloseIn = new DisableOnCloseInputStream(limitedIn);
final ByteCountingInputStream countingStream = new ByteCountingInputStream(disableOnCloseIn, this.bytesRead)) { final ByteCountingInputStream countingStream = new ByteCountingInputStream(disableOnCloseIn, this.bytesRead)) {
// We want to differentiate between IOExceptions thrown by the repository and IOExceptions thrown from // We want to differentiate between IOExceptions thrown by the repository and IOExceptions thrown from
// Processor code. As a result, as have the FlowFileAccessInputStream that catches IOException from the repository // Processor code. As a result, as have the FlowFileAccessInputStream that catches IOException from the repository
@ -1795,6 +1800,12 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
try { try {
recursionSet.add(source); recursionSet.add(source);
reader.process(ffais); reader.process(ffais);
// Allow processors to close the file after reading to avoid too many files open or do smart session stream management.
if(!allowSessionStreamManagement){
currentReadClaimStream.close();
currentReadClaimStream = null;
}
} catch (final ContentNotFoundException cnfe) { } catch (final ContentNotFoundException cnfe) {
cnfeThrown = true; cnfeThrown = true;
throw cnfe; throw cnfe;
@ -1806,6 +1817,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
throw ffais.getContentNotFoundException(); throw ffais.getContentNotFoundException();
} }
} }
} catch (final ContentNotFoundException nfe) { } catch (final ContentNotFoundException nfe) {
handleContentNotFound(nfe, record); handleContentNotFound(nfe, record);
} catch (final IOException ex) { } catch (final IOException ex) {

View File

@ -85,6 +85,7 @@ public class TestStandardProcessSession {
private StandardProcessSession session; private StandardProcessSession session;
private MockContentRepository contentRepo; private MockContentRepository contentRepo;
private FlowFileQueue flowFileQueue; private FlowFileQueue flowFileQueue;
private ProcessContext context;
private ProvenanceEventRepository provenanceRepo; private ProvenanceEventRepository provenanceRepo;
private MockFlowFileRepository flowFileRepo; private MockFlowFileRepository flowFileRepo;
@ -187,7 +188,7 @@ public class TestStandardProcessSession {
contentRepo.initialize(new StandardResourceClaimManager()); contentRepo.initialize(new StandardResourceClaimManager());
flowFileRepo = new MockFlowFileRepository(); flowFileRepo = new MockFlowFileRepository();
final ProcessContext context = new ProcessContext(connectable, new AtomicLong(0L), contentRepo, flowFileRepo, flowFileEventRepo, counterRepo, provenanceRepo); context = new ProcessContext(connectable, new AtomicLong(0L), contentRepo, flowFileRepo, flowFileEventRepo, counterRepo, provenanceRepo);
session = new StandardProcessSession(context); session = new StandardProcessSession(context);
} }
@ -329,7 +330,7 @@ public class TestStandardProcessSession {
final FlowFile flowFile = session.get(); final FlowFile flowFile = session.get();
assertNotNull(flowFile); assertNotNull(flowFile);
final ObjectHolder<InputStream> inputStreamHolder = new ObjectHolder<>(null); final ObjectHolder<InputStream> inputStreamHolder = new ObjectHolder<>(null);
session.read(flowFile, new InputStreamCallback() { session.read(flowFile, true , new InputStreamCallback() {
@Override @Override
public void process(final InputStream inputStream) throws IOException { public void process(final InputStream inputStream) throws IOException {
inputStreamHolder.set(inputStream); inputStreamHolder.set(inputStream);
@ -720,6 +721,40 @@ public class TestStandardProcessSession {
assertEquals("Hello, World", new String(buff)); assertEquals("Hello, World", new String(buff));
} }
@Test
public void testManyFilesOpened() throws IOException {
StandardProcessSession[] standardProcessSessions = new StandardProcessSession[100000];
for(int i = 0; i<70000;i++){
standardProcessSessions[i] = new StandardProcessSession(context);
FlowFile flowFile = standardProcessSessions[i].create();
final byte[] buff = new byte["Hello".getBytes().length];
flowFile = standardProcessSessions[i].append(flowFile, new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
out.write("Hello".getBytes());
}
});
try {
standardProcessSessions[i].read(flowFile, false, new InputStreamCallback() {
@Override
public void process(final InputStream in) throws IOException {
StreamUtils.fillBuffer(in, buff);
}
});
} catch (Exception e){
System.out.println("Failed at file:"+i);
throw e;
}
if(i%1000==0){
System.out.println("i:"+i);
}
}
}
@Test @Test
public void testMissingFlowFileExceptionThrownWhenUnableToReadDataStreamCallback() { public void testMissingFlowFileExceptionThrownWhenUnableToReadDataStreamCallback() {
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()

View File

@ -567,7 +567,7 @@ public class MergeContent extends BinFiles {
final Iterator<FlowFileSessionWrapper> itr = wrappers.iterator(); final Iterator<FlowFileSessionWrapper> itr = wrappers.iterator();
while (itr.hasNext()) { while (itr.hasNext()) {
final FlowFileSessionWrapper wrapper = itr.next(); final FlowFileSessionWrapper wrapper = itr.next();
wrapper.getSession().read(wrapper.getFlowFile(), new InputStreamCallback() { wrapper.getSession().read(wrapper.getFlowFile(), false, new InputStreamCallback() {
@Override @Override
public void process(final InputStream in) throws IOException { public void process(final InputStream in) throws IOException {
StreamUtils.copy(in, out); StreamUtils.copy(in, out);
@ -780,7 +780,7 @@ public class MergeContent extends BinFiles {
for (final FlowFileSessionWrapper wrapper : wrappers) { for (final FlowFileSessionWrapper wrapper : wrappers) {
final FlowFile flowFile = wrapper.getFlowFile(); final FlowFile flowFile = wrapper.getFlowFile();
wrapper.getSession().read(flowFile, new InputStreamCallback() { wrapper.getSession().read(flowFile, false, new InputStreamCallback() {
@Override @Override
public void process(final InputStream rawIn) throws IOException { public void process(final InputStream rawIn) throws IOException {
try (final InputStream in = new BufferedInputStream(rawIn)) { try (final InputStream in = new BufferedInputStream(rawIn)) {
@ -893,7 +893,7 @@ public class MergeContent extends BinFiles {
try (final OutputStream out = new BufferedOutputStream(rawOut)) { try (final OutputStream out = new BufferedOutputStream(rawOut)) {
for (final FlowFileSessionWrapper wrapper : wrappers) { for (final FlowFileSessionWrapper wrapper : wrappers) {
final FlowFile flowFile = wrapper.getFlowFile(); final FlowFile flowFile = wrapper.getFlowFile();
wrapper.getSession().read(flowFile, new InputStreamCallback() { wrapper.getSession().read(flowFile, false, new InputStreamCallback() {
@Override @Override
public void process(InputStream in) throws IOException { public void process(InputStream in) throws IOException {
boolean canMerge = true; boolean canMerge = true;