diff --git a/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java b/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java
index 80fa6c0b42..58f579f174 100644
--- a/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java
+++ b/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java
@@ -135,11 +135,18 @@ public interface ProcessSession {
* {@link #read(FlowFile, InputStreamCallback)}, {@link #read(FlowFile, boolean, InputStreamCallback)} for any of
* the given FlowFiles.
*
No InputStream can be open for the content of any of the given FlowFiles (see {@link #read(FlowFile)}).
- * Each of the FlowFiles provided must be the most up-to-date copy of the FlowFile.
+ * No OutputStream can be open for the content of any of the given FlowFiles (see {@link #write(FlowFile)}.
* For any provided FlowFile, if the FlowFile has any child (e.g., by calling {@link #create(FlowFile)} and passing the FlowFile
* as the argument), then all children that were created must also be in the Collection of provided FlowFiles.
*
*
+ *
+ * Also note, that if any FlowFile given is not the most up-to-date version of that FlowFile, then the most up-to-date
+ * version of the FlowFile will be migrated to the new owner. For example, if a call to {@link #putAttribute(FlowFile, String, String)} is made,
+ * passing flowFile1
as the FlowFile, and then flowFile1
is passed to this method, then the newest version (including the
+ * newly added attribute) will be migrated, not the outdated version of the FlowFile that flowFile1
points to.
+ *
+ *
* @param newOwner the ProcessSession that is to become the new owner of all FlowFiles
* that currently belong to {@code this}.
* @param flowFiles the FlowFiles to migrate
@@ -528,8 +535,11 @@ public interface ProcessSession {
* @param source flowfile to retrieve content of
* @param reader that will be called to read the flowfile content
* @throws IllegalStateException if detected that this method is being
- * called from within a callback of another method in this session and for
- * the given FlowFile(s)
+ * called from within a write callback of another method (i.e., from within the callback
+ * that is passed to {@link #write(FlowFile, OutputStreamCallback)} or {@link #write(FlowFile, StreamCallback)})
+ * or has an OutputStream open (via a call to {@link #write(FlowFile)}) in this session and for
+ * the given FlowFile(s). Said another way, it is not permissible to call this method while writing to
+ * the same FlowFile.
* @throws FlowFileHandlingException if the given FlowFile is already
* transferred or removed or doesn't belong to this session. Automatic
* rollback will occur.
@@ -557,8 +567,11 @@ public interface ProcessSession {
* @param flowFile the FlowFile to read
* @return an InputStream that can be used to read the contents of the FlowFile
* @throws IllegalStateException if detected that this method is being
- * called from within a callback of another method in this session and for
- * the given FlowFile(s)
+ * called from within a write callback of another method (i.e., from within the callback
+ * that is passed to {@link #write(FlowFile, OutputStreamCallback)} or {@link #write(FlowFile, StreamCallback)})
+ * or has an OutputStream open (via a call to {@link #write(FlowFile)}) in this session and for
+ * the given FlowFile(s). Said another way, it is not permissible to call this method while writing to
+ * the same FlowFile.
* @throws FlowFileHandlingException if the given FlowFile is already
* transferred or removed or doesn't belong to this session. Automatic
* rollback will occur.
@@ -580,8 +593,11 @@ public interface ProcessSession {
* @param allowSessionStreamManagement allow session to hold the stream open for performance reasons
* @param reader that will be called to read the flowfile content
* @throws IllegalStateException if detected that this method is being
- * called from within a callback of another method in this session and for
- * the given FlowFile(s)
+ * called from within a write callback of another method (i.e., from within the callback
+ * that is passed to {@link #write(FlowFile, OutputStreamCallback)} or {@link #write(FlowFile, StreamCallback)})
+ * or has an OutputStream open (via a call to {@link #write(FlowFile)}) in this session and for
+ * the given FlowFile(s). Said another way, it is not permissible to call this method while writing to
+ * the same FlowFile.
* @throws FlowFileHandlingException if the given FlowFile is already
* transferred or removed or doesn't belong to this session. Automatic
* rollback will occur.
@@ -664,7 +680,8 @@ public interface ProcessSession {
* @return updated FlowFile
* @throws IllegalStateException if detected that this method is being
* called from within a callback of another method in this session and for
- * the given FlowFile(s)
+ * the given FlowFile(s), or if there is an open InputStream or OutputStream for the FlowFile's content
+ * (see {@link #read(FlowFile)} and {@link #write(FlowFile)}).
* @throws FlowFileHandlingException if the given FlowFile is already
* transferred or removed or doesn't belong to this session. Automatic
* rollback will occur.
@@ -679,6 +696,32 @@ public interface ProcessSession {
*/
FlowFile write(FlowFile source, OutputStreamCallback writer) throws FlowFileAccessException;
+ /**
+ * Provides an OutputStream that can be used to write to the contents of the
+ * given FlowFile.
+ *
+ * @param source to write to
+ *
+ * @return an OutputStream that can be used to write to the contents of the FlowFile
+ *
+ * @throws IllegalStateException if detected that this method is being
+ * called from within a callback of another method in this session and for
+ * the given FlowFile(s), or if there is an open InputStream or OutputStream for the FlowFile's content
+ * (see {@link #read(FlowFile)}).
+ * @throws FlowFileHandlingException if the given FlowFile is already
+ * transferred or removed or doesn't belong to this session. Automatic
+ * rollback will occur.
+ * @throws MissingFlowFileException if the given FlowFile content cannot be
+ * found. The FlowFile should no longer be referenced, will be internally
+ * destroyed, and the session is automatically rolled back and what is left
+ * of the FlowFile is destroyed.
+ * @throws FlowFileAccessException if some IO problem occurs accessing
+ * FlowFile content; if an attempt is made to access the OutputStream
+ * provided to the given OutputStreamCallaback after this method completed
+ * its execution
+ */
+ OutputStream write(FlowFile source);
+
/**
* Executes the given callback against the content corresponding to the
* given flow file.
@@ -692,7 +735,8 @@ public interface ProcessSession {
* @return updated FlowFile
* @throws IllegalStateException if detected that this method is being
* called from within a callback of another method in this session and for
- * the given FlowFile(s)
+ * the given FlowFile(s), or if there is an open InputStream or OutputStream for the FlowFile's content
+ * (see {@link #read(FlowFile)} and {@link #write(FlowFile)}).
* @throws FlowFileHandlingException if the given FlowFile is already
* transferred or removed or doesn't belong to this session. Automatic
* rollback will occur.
@@ -721,6 +765,10 @@ public interface ProcessSession {
* @throws FlowFileAccessException if an attempt is made to access the
* OutputStream provided to the given OutputStreamCallaback after this
* method completed its execution
+ * @throws IllegalStateException if detected that this method is being
+ * called from within a callback of another method in this session and for
+ * the given FlowFile(s), or if there is an open InputStream or OutputStream for the FlowFile's content
+ * (see {@link #read(FlowFile)} and {@link #write(FlowFile)}).
*/
FlowFile append(FlowFile source, OutputStreamCallback writer) throws FlowFileAccessException;
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
index f05b9b35ba..af472d6448 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
@@ -18,6 +18,7 @@ package org.apache.nifi.util;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
@@ -70,12 +71,15 @@ public class MockProcessSession implements ProcessSession {
private final Map originalVersions = new HashMap<>();
private final SharedSessionState sharedState;
private final Map counterMap = new HashMap<>();
- private final Set recursionSet = new HashSet<>();
+ private final Map readRecursionSet = new HashMap<>();
+ private final Set writeRecursionSet = new HashSet<>();
private final MockProvenanceReporter provenanceReporter;
- private final boolean enforceReadStreamsClosed;
+ private final boolean enforceStreamsClosed;
// A List of InputStreams that have been created by calls to {@link #read(FlowFile)} and have not yet been closed.
private final Map openInputStreams = new HashMap<>();
+ // A List of OutputStreams that have been created by calls to {@link #write(FlowFile)} and have not yet been closed.
+ private final Map openOutputStreams = new HashMap<>();
private boolean committed = false;
private boolean rolledback = false;
@@ -87,9 +91,9 @@ public class MockProcessSession implements ProcessSession {
this(sharedState, processor, true);
}
- public MockProcessSession(final SharedSessionState sharedState, final Processor processor, final boolean enforceReadStreamsClosed) {
+ public MockProcessSession(final SharedSessionState sharedState, final Processor processor, final boolean enforceStreamsClosed) {
this.processor = processor;
- this.enforceReadStreamsClosed = enforceReadStreamsClosed;
+ this.enforceStreamsClosed = enforceStreamsClosed;
this.sharedState = sharedState;
this.processorQueue = sharedState.getFlowFileQueue();
provenanceReporter = new MockProvenanceReporter(this, sharedState, processor.getIdentifier(), processor.getClass().getSimpleName());
@@ -134,14 +138,26 @@ public class MockProcessSession implements ProcessSession {
for (final FlowFile flowFile : flowFiles) {
if (openInputStreams.containsKey(flowFile)) {
throw new IllegalStateException(flowFile + " cannot be migrated to a new Process Session because this session currently "
- + "has an open Input Stream for the FlowFile, created by calling ProcessSession.read(FlowFile)");
+ + "has an open InputStream for the FlowFile, created by calling ProcessSession.read(FlowFile)");
}
- if (recursionSet.contains(flowFile)) {
+ if (openOutputStreams.containsKey(flowFile)) {
+ throw new IllegalStateException(flowFile + " cannot be migrated to a new Process Session because this session currently "
+ + "has an open OutputStream for the FlowFile, created by calling ProcessSession.write(FlowFile)");
+ }
+
+ if (readRecursionSet.containsKey(flowFile)) {
throw new IllegalStateException(flowFile + " already in use for an active callback or InputStream created by ProcessSession.read(FlowFile) has not been closed");
}
- ensureCurrentVersion(flowFile);
+ if (writeRecursionSet.contains(flowFile)) {
+ throw new IllegalStateException(flowFile + " already in use for an active callback or OutputStream created by ProcessSession.write(FlowFile) has not been closed");
+ }
+
+ final FlowFile currentVersion = currentVersions.get(flowFile.getId());
+ if (currentVersion == null) {
+ throw new FlowFileHandlingException(flowFile + " is not known in this session");
+ }
}
for (final Map.Entry> entry : transferMap.entrySet()) {
@@ -185,8 +201,8 @@ public class MockProcessSession implements ProcessSession {
}
@Override
- public MockFlowFile clone(final FlowFile flowFile) {
- validateState(flowFile);
+ public MockFlowFile clone(FlowFile flowFile) {
+ flowFile = validateState(flowFile);
final MockFlowFile newFlowFile = new MockFlowFile(sharedState.nextFlowFileId(), flowFile);
currentVersions.put(newFlowFile.getId(), newFlowFile);
beingProcessed.add(newFlowFile.getId());
@@ -194,8 +210,8 @@ public class MockProcessSession implements ProcessSession {
}
@Override
- public MockFlowFile clone(final FlowFile flowFile, final long offset, final long size) {
- validateState(flowFile);
+ public MockFlowFile clone(FlowFile flowFile, final long offset, final long size) {
+ flowFile = validateState(flowFile);
if (offset + size > flowFile.getSize()) {
throw new FlowFileHandlingException("Specified offset of " + offset + " and size " + size + " exceeds size of " + flowFile.toString());
}
@@ -209,26 +225,34 @@ public class MockProcessSession implements ProcessSession {
return newFlowFile;
}
+ private void closeStreams(final Map streamMap, final boolean enforceClosed) {
+ final Map openStreamCopy = new HashMap<>(streamMap); // avoid ConcurrentModificationException by creating a copy of the List
+ for (final Map.Entry entry : openStreamCopy.entrySet()) {
+ final FlowFile flowFile = entry.getKey();
+ final Closeable openStream = entry.getValue();
+
+ try {
+ openStream.close();
+ } catch (IOException e) {
+ throw new FlowFileAccessException("Failed to close stream for " + flowFile, e);
+ }
+
+ if (enforceClosed) {
+ throw new FlowFileHandlingException("Cannot commit session because the following streams were created via "
+ + "calls to ProcessSession.read(FlowFile) or ProcessSession.write(FlowFile) and never closed: " + streamMap);
+ }
+ }
+ }
+
+
@Override
public void commit() {
if (!beingProcessed.isEmpty()) {
throw new FlowFileHandlingException("Cannot commit session because the following FlowFiles have not been removed or transferred: " + beingProcessed);
}
- if (!openInputStreams.isEmpty()) {
- final List openStreamCopy = new ArrayList<>(openInputStreams.values()); // avoid ConcurrentModificationException by creating a copy of the List
- for (final InputStream openInputStream : openStreamCopy) {
- try {
- openInputStream.close();
- } catch (final IOException e) {
- }
- }
-
- if (enforceReadStreamsClosed) {
- throw new FlowFileHandlingException("Cannot commit session because the following Input Streams were created via "
- + "calls to ProcessSession.read(FlowFile) and never closed: " + openStreamCopy);
- }
- }
+ closeStreams(openInputStreams, enforceStreamsClosed);
+ closeStreams(openOutputStreams, enforceStreamsClosed);
committed = true;
beingProcessed.clear();
@@ -288,8 +312,8 @@ public class MockProcessSession implements ProcessSession {
}
@Override
- public void exportTo(final FlowFile flowFile, final OutputStream out) {
- validateState(flowFile);
+ public void exportTo(FlowFile flowFile, final OutputStream out) {
+ flowFile = validateState(flowFile);
if (flowFile == null || out == null) {
throw new IllegalArgumentException("arguments cannot be null");
}
@@ -308,8 +332,8 @@ public class MockProcessSession implements ProcessSession {
}
@Override
- public void exportTo(final FlowFile flowFile, final Path path, final boolean append) {
- validateState(flowFile);
+ public void exportTo(FlowFile flowFile, final Path path, final boolean append) {
+ flowFile = validateState(flowFile);
if (flowFile == null || path == null) {
throw new IllegalArgumentException("argument cannot be null");
}
@@ -391,8 +415,8 @@ public class MockProcessSession implements ProcessSession {
}
@Override
- public MockFlowFile importFrom(final InputStream in, final FlowFile flowFile) {
- validateState(flowFile);
+ public MockFlowFile importFrom(final InputStream in, FlowFile flowFile) {
+ flowFile = validateState(flowFile);
if (in == null || flowFile == null) {
throw new IllegalArgumentException("argument cannot be null");
}
@@ -413,8 +437,8 @@ public class MockProcessSession implements ProcessSession {
}
@Override
- public MockFlowFile importFrom(final Path path, final boolean keepSourceFile, final FlowFile flowFile) {
- validateState(flowFile);
+ public MockFlowFile importFrom(final Path path, final boolean keepSourceFile, FlowFile flowFile) {
+ flowFile = validateState(flowFile);
if (path == null || flowFile == null) {
throw new IllegalArgumentException("argument cannot be null");
}
@@ -438,11 +462,9 @@ public class MockProcessSession implements ProcessSession {
}
@Override
- public MockFlowFile merge(final Collection sources, final FlowFile destination) {
- for (final FlowFile source : sources) {
- validateState(source);
- }
- validateState(destination);
+ public MockFlowFile merge(Collection sources, FlowFile destination) {
+ sources = validateState(sources);
+ destination = validateState(destination);
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
for (final FlowFile flowFile : sources) {
final MockFlowFile mock = (MockFlowFile) flowFile;
@@ -462,8 +484,8 @@ public class MockProcessSession implements ProcessSession {
}
@Override
- public MockFlowFile putAllAttributes(final FlowFile flowFile, final Map attrs) {
- validateState(flowFile);
+ public MockFlowFile putAllAttributes(FlowFile flowFile, final Map attrs) {
+ flowFile = validateState(flowFile);
if (attrs == null || flowFile == null) {
throw new IllegalArgumentException("argument cannot be null");
}
@@ -479,8 +501,8 @@ public class MockProcessSession implements ProcessSession {
}
@Override
- public MockFlowFile putAttribute(final FlowFile flowFile, final String attrName, final String attrValue) {
- validateState(flowFile);
+ public MockFlowFile putAttribute(FlowFile flowFile, final String attrName, final String attrValue) {
+ flowFile = validateState(flowFile);
if (attrName == null || attrValue == null || flowFile == null) {
throw new IllegalArgumentException("argument cannot be null");
}
@@ -508,19 +530,19 @@ public class MockProcessSession implements ProcessSession {
}
@Override
- public void read(final FlowFile flowFile, boolean allowSessionStreamManagement, final InputStreamCallback callback) {
+ public void read(FlowFile flowFile, boolean allowSessionStreamManagement, final InputStreamCallback callback) {
if (callback == null || flowFile == null) {
throw new IllegalArgumentException("argument cannot be null");
}
- validateState(flowFile);
+ flowFile = validateState(flowFile);
if (!(flowFile instanceof MockFlowFile)) {
throw new IllegalArgumentException("Cannot export a flow file that I did not create");
}
final MockFlowFile mock = (MockFlowFile) flowFile;
final ByteArrayInputStream bais = new ByteArrayInputStream(mock.getData());
- recursionSet.add(flowFile);
+ incrementReadCount(flowFile);
try {
callback.process(bais);
if(!allowSessionStreamManagement){
@@ -529,21 +551,35 @@ public class MockProcessSession implements ProcessSession {
} catch (final IOException e) {
throw new ProcessException(e.toString(), e);
} finally {
- recursionSet.remove(flowFile);
+ decrementReadCount(flowFile);
+ }
+ }
+
+ private void incrementReadCount(final FlowFile flowFile) {
+ readRecursionSet.compute(flowFile, (ff, count) -> count == null ? 1 : count + 1);
+ }
+
+ private void decrementReadCount(final FlowFile flowFile) {
+ final Integer count = readRecursionSet.get(flowFile);
+ if (count == null) {
+ return;
+ }
+
+ final int updatedCount = count - 1;
+ if (updatedCount == 0) {
+ readRecursionSet.remove(flowFile);
+ } else {
+ readRecursionSet.put(flowFile, updatedCount);
}
}
@Override
- public InputStream read(final FlowFile flowFile) {
+ public InputStream read(FlowFile flowFile) {
if (flowFile == null) {
throw new IllegalArgumentException("FlowFile cannot be null");
}
- validateState(flowFile);
- if (!(flowFile instanceof MockFlowFile)) {
- throw new IllegalArgumentException("Cannot export a flow file that I did not create");
- }
- final MockFlowFile mock = (MockFlowFile) flowFile;
+ final MockFlowFile mock = validateState(flowFile);
final ByteArrayInputStream bais = new ByteArrayInputStream(mock.getData());
final InputStream errorHandlingStream = new InputStream() {
@@ -559,23 +595,23 @@ public class MockProcessSession implements ProcessSession {
@Override
public void close() throws IOException {
- openInputStreams.remove(flowFile);
+ openInputStreams.remove(mock);
bais.close();
}
@Override
public String toString() {
- return "ErrorHandlingInputStream[flowFile=" + flowFile + "]";
+ return "ErrorHandlingInputStream[flowFile=" + mock + "]";
}
};
- openInputStreams.put(flowFile, errorHandlingStream);
+ openInputStreams.put(mock, errorHandlingStream);
return errorHandlingStream;
}
@Override
- public void remove(final FlowFile flowFile) {
- validateState(flowFile);
+ public void remove(FlowFile flowFile) {
+ flowFile = validateState(flowFile);
final Iterator penalizedItr = penalized.iterator();
while (penalizedItr.hasNext()) {
@@ -603,10 +639,8 @@ public class MockProcessSession implements ProcessSession {
}
@Override
- public void remove(final Collection flowFiles) {
- for (final FlowFile flowFile : flowFiles) {
- validateState(flowFile);
- }
+ public void remove(Collection flowFiles) {
+ flowFiles = validateState(flowFiles);
for (final FlowFile flowFile : flowFiles) {
remove(flowFile);
@@ -614,8 +648,8 @@ public class MockProcessSession implements ProcessSession {
}
@Override
- public MockFlowFile removeAllAttributes(final FlowFile flowFile, final Set attrNames) {
- validateState(flowFile);
+ public MockFlowFile removeAllAttributes(FlowFile flowFile, final Set attrNames) {
+ flowFile = validateState(flowFile);
if (attrNames == null || flowFile == null) {
throw new IllegalArgumentException("argument cannot be null");
}
@@ -632,8 +666,8 @@ public class MockProcessSession implements ProcessSession {
}
@Override
- public MockFlowFile removeAllAttributes(final FlowFile flowFile, final Pattern keyPattern) {
- validateState(flowFile);
+ public MockFlowFile removeAllAttributes(FlowFile flowFile, final Pattern keyPattern) {
+ flowFile = validateState(flowFile);
if (flowFile == null) {
throw new IllegalArgumentException("flowFile cannot be null");
}
@@ -652,8 +686,8 @@ public class MockProcessSession implements ProcessSession {
}
@Override
- public MockFlowFile removeAttribute(final FlowFile flowFile, final String attrName) {
- validateState(flowFile);
+ public MockFlowFile removeAttribute(FlowFile flowFile, final String attrName) {
+ flowFile = validateState(flowFile);
if (attrName == null || flowFile == null) {
throw new IllegalArgumentException("argument cannot be null");
}
@@ -681,13 +715,9 @@ public class MockProcessSession implements ProcessSession {
if(committed){
return;
}
- final List openStreamCopy = new ArrayList<>(openInputStreams.values()); // avoid ConcurrentModificationException by creating a copy of the List
- for (final InputStream openInputStream : openStreamCopy) {
- try {
- openInputStream.close();
- } catch (final IOException e) {
- }
- }
+
+ closeStreams(openInputStreams, false);
+ closeStreams(openOutputStreams, false);
for (final List list : transferMap.values()) {
for (final MockFlowFile flowFile : list) {
@@ -720,8 +750,8 @@ public class MockProcessSession implements ProcessSession {
}
@Override
- public void transfer(final FlowFile flowFile) {
- validateState(flowFile);
+ public void transfer(FlowFile flowFile) {
+ flowFile = validateState(flowFile);
if (!(flowFile instanceof MockFlowFile)) {
throw new IllegalArgumentException("I only accept MockFlowFile");
}
@@ -748,7 +778,7 @@ public class MockProcessSession implements ProcessSession {
}
@Override
- public void transfer(final FlowFile flowFile, final Relationship relationship) {
+ public void transfer(FlowFile flowFile, final Relationship relationship) {
if (relationship == Relationship.SELF) {
transfer(flowFile);
return;
@@ -757,7 +787,7 @@ public class MockProcessSession implements ProcessSession {
throw new IllegalArgumentException("this relationship " + relationship.getName() + " is not known");
}
- validateState(flowFile);
+ flowFile = validateState(flowFile);
List list = transferMap.computeIfAbsent(relationship, r -> new ArrayList<>());
beingProcessed.remove(flowFile.getId());
@@ -766,7 +796,7 @@ public class MockProcessSession implements ProcessSession {
}
@Override
- public void transfer(final Collection flowFiles, final Relationship relationship) {
+ public void transfer(Collection flowFiles, final Relationship relationship) {
if (relationship == Relationship.SELF) {
transfer(flowFiles);
return;
@@ -777,8 +807,8 @@ public class MockProcessSession implements ProcessSession {
}
@Override
- public MockFlowFile write(final FlowFile flowFile, final OutputStreamCallback callback) {
- validateState(flowFile);
+ public MockFlowFile write(FlowFile flowFile, final OutputStreamCallback callback) {
+ flowFile = validateState(flowFile);
if (callback == null || flowFile == null) {
throw new IllegalArgumentException("argument cannot be null");
}
@@ -788,13 +818,13 @@ public class MockProcessSession implements ProcessSession {
final MockFlowFile mock = (MockFlowFile) flowFile;
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
- recursionSet.add(flowFile);
+ writeRecursionSet.add(flowFile);
try {
callback.process(baos);
} catch (final IOException e) {
throw new ProcessException(e.toString(), e);
} finally {
- recursionSet.remove(flowFile);
+ writeRecursionSet.remove(flowFile);
}
final MockFlowFile newFlowFile = new MockFlowFile(mock.getId(), flowFile);
@@ -805,15 +835,35 @@ public class MockProcessSession implements ProcessSession {
}
@Override
- public FlowFile append(final FlowFile flowFile, final OutputStreamCallback callback) {
- validateState(flowFile);
- if (callback == null || flowFile == null) {
- throw new IllegalArgumentException("argument cannot be null");
- }
+ public OutputStream write(FlowFile flowFile) {
if (!(flowFile instanceof MockFlowFile)) {
throw new IllegalArgumentException("Cannot export a flow file that I did not create");
}
- final MockFlowFile mock = (MockFlowFile) flowFile;
+
+ final MockFlowFile mockFlowFile = validateState(flowFile);
+ writeRecursionSet.add(flowFile);
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream() {
+ @Override
+ public void close() throws IOException {
+ super.close();
+
+ writeRecursionSet.remove(mockFlowFile);
+ final MockFlowFile newFlowFile = new MockFlowFile(mockFlowFile.getId(), flowFile);
+ currentVersions.put(newFlowFile.getId(), newFlowFile);
+
+ newFlowFile.setData(toByteArray());
+ }
+ };
+
+ return baos;
+ }
+
+ @Override
+ public FlowFile append(FlowFile flowFile, final OutputStreamCallback callback) {
+ if (callback == null || flowFile == null) {
+ throw new IllegalArgumentException("argument cannot be null");
+ }
+ final MockFlowFile mock = validateState(flowFile);
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
try {
@@ -832,25 +882,21 @@ public class MockProcessSession implements ProcessSession {
@Override
public MockFlowFile write(final FlowFile flowFile, final StreamCallback callback) {
- validateState(flowFile);
if (callback == null || flowFile == null) {
throw new IllegalArgumentException("argument cannot be null");
}
- if (!(flowFile instanceof MockFlowFile)) {
- throw new IllegalArgumentException("Cannot export a flow file that I did not create");
- }
- final MockFlowFile mock = (MockFlowFile) flowFile;
+ final MockFlowFile mock = validateState(flowFile);
final ByteArrayInputStream in = new ByteArrayInputStream(mock.getData());
final ByteArrayOutputStream out = new ByteArrayOutputStream();
- recursionSet.add(flowFile);
+ writeRecursionSet.add(flowFile);
try {
callback.process(in, out);
} catch (final IOException e) {
throw new ProcessException(e.toString(), e);
} finally {
- recursionSet.remove(flowFile);
+ writeRecursionSet.remove(flowFile);
}
final MockFlowFile newFlowFile = new MockFlowFile(mock.getId(), flowFile);
@@ -912,10 +958,8 @@ public class MockProcessSession implements ProcessSession {
@Override
public MockFlowFile merge(Collection sources, FlowFile destination, byte[] header, byte[] footer, byte[] demarcator) {
- for (final FlowFile flowFile : sources) {
- validateState(flowFile);
- }
- validateState(destination);
+ sources = validateState(sources);
+ destination = validateState(destination);
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
try {
@@ -945,30 +989,37 @@ public class MockProcessSession implements ProcessSession {
return newFlowFile;
}
- private void validateState(final FlowFile flowFile) {
+ private List validateState(final Collection flowFiles) {
+ return flowFiles.stream()
+ .map(ff -> validateState(ff))
+ .collect(Collectors.toList());
+ }
+
+ private MockFlowFile validateState(final FlowFile flowFile) {
Objects.requireNonNull(flowFile);
- ensureCurrentVersion(flowFile);
- if (recursionSet.contains(flowFile)) {
+
+ final MockFlowFile currentVersion = currentVersions.get(flowFile.getId());
+ if (currentVersion == null) {
+ throw new FlowFileHandlingException(flowFile + " is not known in this session");
+ }
+
+ if (readRecursionSet.containsKey(flowFile)) {
throw new IllegalStateException(flowFile + " already in use for an active callback or InputStream created by ProcessSession.read(FlowFile) has not been closed");
}
+ if (writeRecursionSet.contains(flowFile)) {
+ throw new IllegalStateException(flowFile + " already in use for an active callback or OutputStream created by ProcessSession.write(FlowFile) has not been closed");
+ }
+
for (final List flowFiles : transferMap.values()) {
if (flowFiles.contains(flowFile)) {
throw new IllegalStateException(flowFile + " has already been transferred");
}
}
+
+ return currentVersion;
}
- private void ensureCurrentVersion(final FlowFile flowFile) {
- final FlowFile currentVersion = currentVersions.get(flowFile.getId());
- if (currentVersion == null) {
- throw new FlowFileHandlingException(flowFile + " is not known in this session");
- }
-
- if (currentVersion != flowFile) {
- throw new FlowFileHandlingException(flowFile + " is not the most recent version of this flow file within this session");
- }
- }
/**
* Inherits the attributes from the given source flow file into another flow
@@ -1235,8 +1286,8 @@ public class MockProcessSession implements ProcessSession {
}
@Override
- public MockFlowFile penalize(final FlowFile flowFile) {
- validateState(flowFile);
+ public MockFlowFile penalize(FlowFile flowFile) {
+ flowFile = validateState(flowFile);
final MockFlowFile mockFlowFile = (MockFlowFile) flowFile;
final MockFlowFile newFlowFile = new MockFlowFile(mockFlowFile.getId(), flowFile);
currentVersions.put(newFlowFile.getId(), newFlowFile);
@@ -1245,8 +1296,8 @@ public class MockProcessSession implements ProcessSession {
return newFlowFile;
}
- public byte[] getContentAsByteArray(final MockFlowFile flowFile) {
- validateState(flowFile);
+ public byte[] getContentAsByteArray(MockFlowFile flowFile) {
+ flowFile = validateState(flowFile);
return flowFile.getData();
}
diff --git a/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java b/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java
index d1c2bea7e9..1c554d2007 100644
--- a/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java
+++ b/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java
@@ -38,11 +38,6 @@ import org.junit.Test;
public class TestMockProcessSession {
- @Test(expected = AssertionError.class)
- public void testPenalizeFlowFileFromProcessor() {
- TestRunners.newTestRunner(PoorlyBehavedProcessor.class).run();
- }
-
@Test
public void testReadWithoutCloseThrowsExceptionOnCommit() throws IOException {
final Processor processor = new PoorlyBehavedProcessor();
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/BatchingSessionFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/BatchingSessionFactory.java
index 90c9dc7918..73b6c5bd80 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/BatchingSessionFactory.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/BatchingSessionFactory.java
@@ -251,5 +251,10 @@ public class BatchingSessionFactory implements ProcessSessionFactory {
public ProvenanceReporter getProvenanceReporter() {
return session.getProvenanceReporter();
}
+
+ @Override
+ public OutputStream write(final FlowFile source) {
+ return session.write(source);
+ }
}
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index 512461f8a5..b3079303f0 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -18,6 +18,7 @@ package org.apache.nifi.controller.repository;
import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
+import java.io.Closeable;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
@@ -110,7 +111,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
private final Map counters = new HashMap<>();
private final Map appendableStreams = new HashMap<>();
private final ProcessContext context;
- private final Set recursionSet = new HashSet<>();// set used to track what is currently being operated on to prevent logic failures if recursive calls occurring
+ private final Map readRecursionSet = new HashMap<>();// set used to track what is currently being operated on to prevent logic failures if recursive calls occurring
+ private final Set writeRecursionSet = new HashSet<>();
private final Map deleteOnCommit = new HashMap<>();
private final long sessionId;
private final String connectableDescription;
@@ -133,6 +135,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
// List of InputStreams that have been opened by calls to {@link #read(FlowFile)} and not yet closed
private final Map openInputStreams = new HashMap<>();
+ // List of OutputStreams that have been opened by calls to {@link #write(FlowFile)} and not yet closed
+ private final Map openOutputStreams = new HashMap<>();
// maps a FlowFile to all Provenance Events that were generated for that FlowFile.
// we do this so that if we generate a Fork event, for example, and then remove the event in the same
@@ -187,14 +191,11 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
processingStartTime = System.nanoTime();
}
- public void checkpoint() {
-
- resetWriteClaims(false);
-
- final Map openStreamCopy = new HashMap<>(openInputStreams); // avoid ConcurrentModificationException by creating a copy of the List
- for (final Map.Entry entry : openStreamCopy.entrySet()) {
+ private void closeStreams(final Map streamMap) {
+ final Map openStreamCopy = new HashMap<>(streamMap); // avoid ConcurrentModificationException by creating a copy of the List
+ for (final Map.Entry entry : openStreamCopy.entrySet()) {
final FlowFile flowFile = entry.getKey();
- final InputStream openStream = entry.getValue();
+ final Closeable openStream = entry.getValue();
LOG.warn("{} closing {} for {} because the session was committed without the stream being closed.", this, openStream, flowFile);
@@ -205,8 +206,19 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
LOG.warn("", e);
}
}
+ }
- if (!recursionSet.isEmpty()) {
+ public void checkpoint() {
+
+ resetWriteClaims(false);
+
+ closeStreams(openInputStreams);
+ closeStreams(openOutputStreams);
+
+ if (!readRecursionSet.isEmpty()) {
+ throw new IllegalStateException();
+ }
+ if (!writeRecursionSet.isEmpty()) {
throw new IllegalStateException();
}
@@ -902,19 +914,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
deleteOnCommit.clear();
- final Map openStreamCopy = new HashMap<>(openInputStreams); // avoid ConcurrentModificationException by creating a copy of the List
- for (final Map.Entry entry : openStreamCopy.entrySet()) {
- final FlowFile flowFile = entry.getKey();
- final InputStream openStream = entry.getValue();
-
- LOG.debug("{} closing {} for {} due to session rollback", this, openStream, flowFile);
- try {
- openStream.close();
- } catch (final Exception e) {
- LOG.warn("{} Attempted to close {} for {} due to session rollback but close failed", this, openStream, this.connectableDescription);
- LOG.warn("", e);
- }
- }
+ closeStreams(openInputStreams);
+ closeStreams(openOutputStreams);
try {
claimCache.reset();
@@ -1092,7 +1093,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
private void resetState() {
records.clear();
- recursionSet.clear();
+ readRecursionSet.clear();
+ writeRecursionSet.clear();
contentSizeIn = 0L;
contentSizeOut = 0L;
flowFilesIn = 0;
@@ -1142,12 +1144,21 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
for (final FlowFile flowFile : flowFiles) {
if (openInputStreams.containsKey(flowFile)) {
throw new IllegalStateException(flowFile + " cannot be migrated to a new Process Session because this session currently "
- + "has an open Input Stream for the FlowFile, created by calling ProcessSession.read(FlowFile)");
+ + "has an open InputStream for the FlowFile, created by calling ProcessSession.read(FlowFile)");
}
- if (recursionSet.contains(flowFile)) {
+ if (openOutputStreams.containsKey(flowFile)) {
+ throw new IllegalStateException(flowFile + " cannot be migrated to a new Process Session because this session currently "
+ + "has an open OutputStream for the FlowFile, created by calling ProcessSession.write(FlowFile)");
+ }
+
+ if (readRecursionSet.containsKey(flowFile)) {
throw new IllegalStateException(flowFile + " already in use for an active callback or InputStream created by ProcessSession.read(FlowFile) has not been closed");
}
+ if (writeRecursionSet.contains(flowFile)) {
+ throw new IllegalStateException(flowFile + " already in use for an active callback or OutputStream created by ProcessSession.write(FlowFile) has not been closed");
+ }
+
final StandardRepositoryRecord record = records.get(flowFile);
if (record == null) {
throw new FlowFileHandlingException(flowFile + " is not known in this session (" + toString() + ")");
@@ -1614,8 +1625,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
@Override
- public FlowFile clone(final FlowFile example, final long offset, final long size) {
- validateRecordState(example);
+ public FlowFile clone(FlowFile example, final long offset, final long size) {
+ example = validateRecordState(example);
final StandardRepositoryRecord exampleRepoRecord = records.get(example);
final FlowFileRecord currRec = exampleRepoRecord.getCurrent();
final ContentClaim claim = exampleRepoRecord.getCurrentClaim();
@@ -1683,8 +1694,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
@Override
- public FlowFile penalize(final FlowFile flowFile) {
- validateRecordState(flowFile);
+ public FlowFile penalize(FlowFile flowFile) {
+ flowFile = validateRecordState(flowFile);
final StandardRepositoryRecord record = records.get(flowFile);
final long expirationEpochMillis = System.currentTimeMillis() + context.getConnectable().getPenalizationPeriod(TimeUnit.MILLISECONDS);
final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).penaltyExpirationTime(expirationEpochMillis).build();
@@ -1693,8 +1704,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
@Override
- public FlowFile putAttribute(final FlowFile flowFile, final String key, final String value) {
- validateRecordState(flowFile);
+ public FlowFile putAttribute(FlowFile flowFile, final String key, final String value) {
+ flowFile = validateRecordState(flowFile);
if (CoreAttributes.UUID.key().equals(key)) {
return flowFile;
@@ -1708,8 +1719,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
@Override
- public FlowFile putAllAttributes(final FlowFile flowFile, final Map attributes) {
- validateRecordState(flowFile);
+ public FlowFile putAllAttributes(FlowFile flowFile, final Map attributes) {
+ flowFile = validateRecordState(flowFile);
final StandardRepositoryRecord record = records.get(flowFile);
final Map updatedAttributes;
@@ -1728,8 +1739,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
@Override
- public FlowFile removeAttribute(final FlowFile flowFile, final String key) {
- validateRecordState(flowFile);
+ public FlowFile removeAttribute(FlowFile flowFile, final String key) {
+ flowFile = validateRecordState(flowFile);
if (CoreAttributes.UUID.key().equals(key)) {
return flowFile;
@@ -1742,8 +1753,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
@Override
- public FlowFile removeAllAttributes(final FlowFile flowFile, final Set keys) {
- validateRecordState(flowFile);
+ public FlowFile removeAllAttributes(FlowFile flowFile, final Set keys) {
+ flowFile = validateRecordState(flowFile);
if (keys == null) {
return flowFile;
@@ -1766,8 +1777,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
@Override
- public FlowFile removeAllAttributes(final FlowFile flowFile, final Pattern keyPattern) {
- validateRecordState(flowFile);
+ public FlowFile removeAllAttributes(FlowFile flowFile, final Pattern keyPattern) {
+ flowFile = validateRecordState(flowFile);
final StandardRepositoryRecord record = records.get(flowFile);
final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).removeAttributes(keyPattern).build();
@@ -1800,8 +1811,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
@Override
- public void transfer(final FlowFile flowFile, final Relationship relationship) {
- validateRecordState(flowFile);
+ public void transfer(FlowFile flowFile, final Relationship relationship) {
+ flowFile = validateRecordState(flowFile);
final int numDestinations = context.getConnections(relationship).size();
final int multiplier = Math.max(1, numDestinations);
@@ -1830,8 +1841,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
@Override
- public void transfer(final FlowFile flowFile) {
- validateRecordState(flowFile);
+ public void transfer(FlowFile flowFile) {
+ flowFile = validateRecordState(flowFile);
final StandardRepositoryRecord record = records.get(flowFile);
if (record.getOriginalQueue() == null) {
throw new IllegalArgumentException("Cannot transfer FlowFiles that are created in this Session back to self");
@@ -1848,8 +1859,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
@Override
- public void transfer(final Collection flowFiles, final Relationship relationship) {
- validateRecordState(flowFiles);
+ public void transfer(Collection flowFiles, final Relationship relationship) {
+ flowFiles = validateRecordState(flowFiles);
boolean autoTerminated = false;
boolean selfRelationship = false;
@@ -1885,8 +1896,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
@Override
- public void remove(final FlowFile flowFile) {
- validateRecordState(flowFile);
+ public void remove(FlowFile flowFile) {
+ flowFile = validateRecordState(flowFile);
final StandardRepositoryRecord record = records.get(flowFile);
record.markForDelete();
removedFlowFiles.add(flowFile.getAttribute(CoreAttributes.UUID.key()));
@@ -1906,8 +1917,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
@Override
- public void remove(final Collection flowFiles) {
- validateRecordState(flowFiles);
+ public void remove(Collection flowFiles) {
+ flowFiles = validateRecordState(flowFiles);
for (final FlowFile flowFile : flowFiles) {
final StandardRepositoryRecord record = records.get(flowFile);
record.markForDelete();
@@ -2044,7 +2055,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
// If the recursion set is empty, we can use the same input stream that we already have open. However, if
// the recursion set is NOT empty, we can't do this because we may be reading the input of FlowFile 1 while in the
// callback for reading FlowFile 1 and if we used the same stream we'd be destroying the ability to read from FlowFile 1.
- if (allowCachingOfStream && recursionSet.isEmpty()) {
+ if (allowCachingOfStream && readRecursionSet.isEmpty() && writeRecursionSet.isEmpty()) {
if (currentReadClaim == claim) {
if (currentReadClaimStream != null && currentReadClaimStream.getBytesConsumed() <= offset) {
final long bytesToSkip = offset - currentReadClaimStream.getBytesConsumed();
@@ -2097,7 +2108,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
@Override
public void read(FlowFile source, boolean allowSessionStreamManagement, InputStreamCallback reader) {
- validateRecordState(source);
+ source = validateRecordState(source, true);
final StandardRepositoryRecord record = records.get(source);
try {
@@ -2121,7 +2132,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
boolean cnfeThrown = false;
try {
- recursionSet.add(source);
+ incrementReadCount(source);
reader.process(ffais);
// Allow processors to close the file after reading to avoid too many files open or do smart session stream management.
@@ -2133,7 +2144,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
cnfeThrown = true;
throw cnfe;
} finally {
- recursionSet.remove(source);
+ decrementReadCount(source);
bytesRead += countingStream.getBytesRead();
// if cnfeThrown is true, we don't need to re-thrown the Exception; it will propagate.
@@ -2150,8 +2161,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
@Override
- public InputStream read(final FlowFile source) {
- validateRecordState(source);
+ public InputStream read(FlowFile source) {
+ source = validateRecordState(source, true);
final StandardRepositoryRecord record = records.get(source);
try {
@@ -2165,6 +2176,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
final ByteCountingInputStream countingStream = new ByteCountingInputStream(limitedIn);
final FlowFileAccessInputStream ffais = new FlowFileAccessInputStream(countingStream, source, record.getCurrentClaim());
+ final FlowFile sourceFlowFile = source;
final InputStream errorHandlingStream = new InputStream() {
private boolean closed = false;
@@ -2177,7 +2189,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
close();
throw cnfe;
} catch (final FlowFileAccessException ffae) {
- LOG.error("Failed to read content from " + source + "; rolling back session", ffae);
+ LOG.error("Failed to read content from " + sourceFlowFile + "; rolling back session", ffae);
rollback(true);
close();
throw ffae;
@@ -2198,7 +2210,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
close();
throw cnfe;
} catch (final FlowFileAccessException ffae) {
- LOG.error("Failed to read content from " + source + "; rolling back session", ffae);
+ LOG.error("Failed to read content from " + sourceFlowFile + "; rolling back session", ffae);
rollback(true);
close();
throw ffae;
@@ -2207,13 +2219,15 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
@Override
public void close() throws IOException {
+ decrementReadCount(sourceFlowFile);
+
if (!closed) {
StandardProcessSession.this.bytesRead += countingStream.getBytesRead();
closed = true;
}
ffais.close();
- openInputStreams.remove(source);
+ openInputStreams.remove(sourceFlowFile);
}
@Override
@@ -2243,23 +2257,42 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
@Override
public String toString() {
- return "ErrorHandlingInputStream[FlowFile=" + source + "]";
+ return "ErrorHandlingInputStream[FlowFile=" + sourceFlowFile + "]";
}
};
- openInputStreams.put(source, errorHandlingStream);
+ incrementReadCount(sourceFlowFile);
+ openInputStreams.put(sourceFlowFile, errorHandlingStream);
return errorHandlingStream;
}
+ private void incrementReadCount(final FlowFile flowFile) {
+ readRecursionSet.compute(flowFile, (ff, count) -> count == null ? 1 : count + 1);
+ }
+
+ private void decrementReadCount(final FlowFile flowFile) {
+ final Integer count = readRecursionSet.get(flowFile);
+ if (count == null) {
+ return;
+ }
+
+ final int updatedCount = count - 1;
+ if (updatedCount == 0) {
+ readRecursionSet.remove(flowFile);
+ } else {
+ readRecursionSet.put(flowFile, updatedCount);
+ }
+ }
+
@Override
public FlowFile merge(final Collection sources, final FlowFile destination) {
return merge(sources, destination, null, null, null);
}
@Override
- public FlowFile merge(final Collection sources, final FlowFile destination, final byte[] header, final byte[] footer, final byte[] demarcator) {
- validateRecordState(sources);
- validateRecordState(destination);
+ public FlowFile merge(Collection sources, FlowFile destination, final byte[] header, final byte[] footer, final byte[] demarcator) {
+ sources = validateRecordState(sources);
+ destination = validateRecordState(destination);
if (sources.contains(destination)) {
throw new IllegalArgumentException("Destination cannot be within sources");
}
@@ -2358,8 +2391,125 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
@Override
- public FlowFile write(final FlowFile source, final OutputStreamCallback writer) {
- validateRecordState(source);
+ public OutputStream write(FlowFile source) {
+ source = validateRecordState(source);
+ final StandardRepositoryRecord record = records.get(source);
+
+ ContentClaim newClaim = null;
+ try {
+ newClaim = claimCache.getContentClaim();
+ claimLog.debug("Creating ContentClaim {} for 'write' for {}", newClaim, source);
+ ensureNotAppending(newClaim);
+
+ final OutputStream rawStream = claimCache.write(newClaim);
+ final OutputStream disableOnClose = new DisableOnCloseOutputStream(rawStream);
+ final ByteCountingOutputStream countingOut = new ByteCountingOutputStream(disableOnClose);
+
+ final FlowFile sourceFlowFile = source;
+ final ContentClaim updatedClaim = newClaim;
+ final OutputStream errorHandlingOutputStream = new OutputStream() {
+ private boolean closed = false;
+
+ @Override
+ public void write(final int b) throws IOException {
+ try {
+ countingOut.write(b);
+ } catch (final IOException ioe) {
+ LOG.error("Failed to write content to " + sourceFlowFile + "; rolling back session", ioe);
+ rollback(true);
+ close();
+ throw new FlowFileAccessException("Failed to write to Content Repository for " + sourceFlowFile, ioe);
+ }
+ }
+
+ @Override
+ public void write(final byte[] b) throws IOException {
+ try {
+ countingOut.write(b);
+ } catch (final IOException ioe) {
+ LOG.error("Failed to write content to " + sourceFlowFile + "; rolling back session", ioe);
+ rollback(true);
+ close();
+ throw new FlowFileAccessException("Failed to write to Content Repository for " + sourceFlowFile, ioe);
+ }
+ }
+
+ @Override
+ public void write(final byte[] b, final int off, final int len) throws IOException {
+ try {
+ countingOut.write(b, off, len);
+ } catch (final IOException ioe) {
+ LOG.error("Failed to write content to " + sourceFlowFile + "; rolling back session", ioe);
+ rollback(true);
+ close();
+ throw new FlowFileAccessException("Failed to write to Content Repository for " + sourceFlowFile, ioe);
+ }
+ }
+
+ @Override
+ public void flush() throws IOException {
+ try {
+ countingOut.flush();
+ } catch (final IOException ioe) {
+ LOG.error("Failed to write content to " + sourceFlowFile + "; rolling back session", ioe);
+ rollback(true);
+ close();
+ throw new FlowFileAccessException("Failed to write to Content Repository for " + sourceFlowFile, ioe);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ writeRecursionSet.remove(sourceFlowFile);
+
+ final long bytesWritten = countingOut.getBytesWritten();
+ if (!closed) {
+ StandardProcessSession.this.bytesWritten += bytesWritten;
+ closed = true;
+ }
+
+ openOutputStreams.remove(sourceFlowFile);
+
+ removeTemporaryClaim(record);
+
+ flush();
+ final FlowFileRecord newFile = new StandardFlowFileRecord.Builder()
+ .fromFlowFile(record.getCurrent())
+ .contentClaim(updatedClaim)
+ .contentClaimOffset(Math.max(0, updatedClaim.getLength() - bytesWritten))
+ .size(bytesWritten)
+ .build();
+
+ record.setWorking(newFile);
+ }
+ };
+
+ writeRecursionSet.add(source);
+ openOutputStreams.put(source, errorHandlingOutputStream);
+ return errorHandlingOutputStream;
+ } catch (final ContentNotFoundException nfe) {
+ resetWriteClaims(); // need to reset write claim before we can remove the claim
+ destroyContent(newClaim);
+ handleContentNotFound(nfe, record);
+ throw nfe;
+ } catch (final FlowFileAccessException ffae) {
+ resetWriteClaims(); // need to reset write claim before we can remove the claim
+ destroyContent(newClaim);
+ throw ffae;
+ } catch (final IOException ioe) {
+ resetWriteClaims(); // need to reset write claim before we can remove the claim
+ destroyContent(newClaim);
+ throw new ProcessException("IOException thrown from " + connectableDescription + ": " + ioe.toString(), ioe);
+ } catch (final Throwable t) {
+ resetWriteClaims(); // need to reset write claim before we can remove the claim
+ destroyContent(newClaim);
+ throw t;
+ }
+ }
+
+ @Override
+ public FlowFile write(FlowFile source, final OutputStreamCallback writer) {
+ source = validateRecordState(source);
final StandardRepositoryRecord record = records.get(source);
long writtenToFlowFile = 0L;
@@ -2373,14 +2523,14 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
final OutputStream disableOnClose = new DisableOnCloseOutputStream(stream);
final ByteCountingOutputStream countingOut = new ByteCountingOutputStream(disableOnClose)) {
try {
- recursionSet.add(source);
+ writeRecursionSet.add(source);
writer.process(new FlowFileAccessOutputStream(countingOut, source));
} finally {
writtenToFlowFile = countingOut.getBytesWritten();
bytesWritten += countingOut.getBytesWritten();
}
} finally {
- recursionSet.remove(source);
+ writeRecursionSet.remove(source);
}
} catch (final ContentNotFoundException nfe) {
resetWriteClaims(); // need to reset write claim before we can remove the claim
@@ -2412,9 +2562,10 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
return newFile;
}
+
@Override
- public FlowFile append(final FlowFile source, final OutputStreamCallback writer) {
- validateRecordState(source);
+ public FlowFile append(FlowFile source, final OutputStreamCallback writer) {
+ source = validateRecordState(source);
final StandardRepositoryRecord record = records.get(source);
long newSize = 0L;
@@ -2445,10 +2596,10 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
// wrap our OutputStreams so that the processor cannot close it
try (final OutputStream disableOnClose = new DisableOnCloseOutputStream(outStream)) {
- recursionSet.add(source);
+ writeRecursionSet.add(source);
writer.process(new FlowFileAccessOutputStream(disableOnClose, source));
} finally {
- recursionSet.remove(source);
+ writeRecursionSet.remove(source);
}
}
} else {
@@ -2458,10 +2609,10 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
// wrap our OutputStreams so that the processor cannot close it
try (final OutputStream disableOnClose = new DisableOnCloseOutputStream(outStream);
final OutputStream flowFileAccessOutStream = new FlowFileAccessOutputStream(disableOnClose, source)) {
- recursionSet.add(source);
+ writeRecursionSet.add(source);
writer.process(flowFileAccessOutStream);
} finally {
- recursionSet.remove(source);
+ writeRecursionSet.remove(source);
}
}
@@ -2593,8 +2744,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
@Override
- public FlowFile write(final FlowFile source, final StreamCallback writer) {
- validateRecordState(source);
+ public FlowFile write(FlowFile source, final StreamCallback writer) {
+ source = validateRecordState(source);
final StandardRepositoryRecord record = records.get(source);
final ContentClaim currClaim = record.getCurrentClaim();
@@ -2618,7 +2769,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
final OutputStream disableOnCloseOut = new DisableOnCloseOutputStream(os);
final ByteCountingOutputStream countingOut = new ByteCountingOutputStream(disableOnCloseOut)) {
- recursionSet.add(source);
+ writeRecursionSet.add(source);
// We want to differentiate between IOExceptions thrown by the repository and IOExceptions thrown from
// Processor code. As a result, as have the FlowFileAccessInputStream that catches IOException from the repository
@@ -2637,7 +2788,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
writtenToFlowFile = countingOut.getBytesWritten();
this.bytesWritten += writtenToFlowFile;
this.bytesRead += countingIn.getBytesRead();
- recursionSet.remove(source);
+ writeRecursionSet.remove(source);
// if cnfeThrown is true, we don't need to re-thrown the Exception; it will propagate.
if (!cnfeThrown && ffais.getContentNotFoundException() != null) {
@@ -2672,8 +2823,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
@Override
- public FlowFile importFrom(final Path source, final boolean keepSourceFile, final FlowFile destination) {
- validateRecordState(destination);
+ public FlowFile importFrom(final Path source, final boolean keepSourceFile, FlowFile destination) {
+ destination = validateRecordState(destination);
// TODO: find a better solution. With Windows 7 and Java 7 (very early update, at least), Files.isWritable(source.getParent()) returns false, even when it should be true.
if (!keepSourceFile && !Files.isWritable(source.getParent()) && !source.getParent().toFile().canWrite()) {
// If we do NOT want to keep the file, ensure that we can delete it, or else error.
@@ -2722,8 +2873,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
@Override
- public FlowFile importFrom(final InputStream source, final FlowFile destination) {
- validateRecordState(destination);
+ public FlowFile importFrom(final InputStream source, FlowFile destination) {
+ destination = validateRecordState(destination);
final StandardRepositoryRecord record = records.get(destination);
ContentClaim newClaim = null;
final long claimOffset = 0L;
@@ -2759,8 +2910,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
@Override
- public void exportTo(final FlowFile source, final Path destination, final boolean append) {
- validateRecordState(source);
+ public void exportTo(FlowFile source, final Path destination, final boolean append) {
+ source = validateRecordState(source);
final StandardRepositoryRecord record = records.get(source);
try {
ensureNotAppending(record.getCurrentClaim());
@@ -2777,8 +2928,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
@Override
- public void exportTo(final FlowFile source, final OutputStream destination) {
- validateRecordState(source);
+ public void exportTo(FlowFile source, final OutputStream destination) {
+ source = validateRecordState(source);
final StandardRepositoryRecord record = records.get(source);
if(record.getCurrentClaim() == null) {
@@ -2806,13 +2957,14 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
boolean cnfeThrown = false;
try {
- recursionSet.add(source);
+ incrementReadCount(source);
StreamUtils.copy(ffais, destination, source.getSize());
} catch (final ContentNotFoundException cnfe) {
cnfeThrown = true;
throw cnfe;
} finally {
- recursionSet.remove(source);
+ decrementReadCount(source);
+
IOUtils.closeQuietly(ffais);
// if cnfeThrown is true, we don't need to re-throw the Exception; it will propagate.
if (!cnfeThrown && ffais.getContentNotFoundException() != null) {
@@ -2855,35 +3007,41 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
}
- private void validateRecordState(final FlowFile... flowFiles) {
- for (final FlowFile file : flowFiles) {
- if (recursionSet.contains(file)) {
- throw new IllegalStateException(file + " already in use for an active callback or InputStream created by ProcessSession.read(FlowFile) has not been closed");
- }
- final StandardRepositoryRecord record = records.get(file);
- if (record == null) {
- rollback();
- throw new FlowFileHandlingException(file + " is not known in this session (" + toString() + ")");
- }
- if (record.getCurrent() != file) {
- rollback();
- throw new FlowFileHandlingException(file + " is not the most recent version of this FlowFile within this session (" + toString() + ")");
- }
- if (record.getTransferRelationship() != null) {
- rollback();
- throw new FlowFileHandlingException(file + " is already marked for transfer");
- }
- if (record.isMarkedForDelete()) {
- rollback();
- throw new FlowFileHandlingException(file + " has already been marked for removal");
- }
- }
+ private FlowFile validateRecordState(final FlowFile flowFile) {
+ return validateRecordState(flowFile, false);
}
- private void validateRecordState(final Collection flowFiles) {
- for (final FlowFile flowFile : flowFiles) {
- validateRecordState(flowFile);
+ private FlowFile validateRecordState(final FlowFile flowFile, final boolean allowRecursiveRead) {
+ if (!allowRecursiveRead && readRecursionSet.containsKey(flowFile)) {
+ throw new IllegalStateException(flowFile + " already in use for an active callback or an InputStream created by ProcessSession.read(FlowFile) has not been closed");
}
+ if (writeRecursionSet.contains(flowFile)) {
+ throw new IllegalStateException(flowFile + " already in use for an active callback or an OutputStream created by ProcessSession.write(FlowFile) has not been closed");
+ }
+
+ final StandardRepositoryRecord record = records.get(flowFile);
+ if (record == null) {
+ rollback();
+ throw new FlowFileHandlingException(flowFile + " is not known in this session (" + toString() + ")");
+ }
+ if (record.getTransferRelationship() != null) {
+ rollback();
+ throw new FlowFileHandlingException(flowFile + " is already marked for transfer");
+ }
+ if (record.isMarkedForDelete()) {
+ rollback();
+ throw new FlowFileHandlingException(flowFile + " has already been marked for removal");
+ }
+
+ return record.getCurrent();
+ }
+
+ private List validateRecordState(final Collection flowFiles) {
+ final List current = new ArrayList<>(flowFiles.size());
+ for (final FlowFile flowFile : flowFiles) {
+ current.add(validateRecordState(flowFile));
+ }
+ return current;
}
/**
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
index 1dcb9919dc..5a939aecbe 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
@@ -1509,7 +1509,17 @@ public class TestStandardProcessSession {
StreamUtils.fillBuffer(in, buffer);
assertEquals("hello, world", new String(buffer));
+ try {
+ session.remove(flowFile);
+ Assert.fail("Was able to remove FlowFile while an InputStream is open for it");
+ } catch (final IllegalStateException e) {
+ // expected
+ }
+
+ in.close();
+
session.remove(flowFile);
+
session.commit(); // This should generate a WARN log message. We can't really test this in a unit test but can verify manually.
}
@@ -1544,6 +1554,96 @@ public class TestStandardProcessSession {
session.commit();
}
+ @Test
+ public void testWriteToOutputStream() throws IOException {
+ final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
+ .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+ .entryDate(System.currentTimeMillis())
+ .size(12L)
+ .build();
+
+ flowFileQueue.put(flowFileRecord);
+
+ FlowFile flowFile = session.get();
+ try (final OutputStream out = session.write(flowFile)) {
+ out.write("hello, world".getBytes());
+ }
+
+ // Call putAllAttributes, because this will return to us the most recent version
+ // of the FlowFile. In a Processor, we wouldn't need this, but for testing purposes
+ // we need it in order to get the Content Claim.
+ flowFile = session.putAllAttributes(flowFile, Collections.emptyMap());
+ assertEquals(12L, flowFile.getSize());
+
+ final byte[] buffer = new byte[(int) flowFile.getSize()];
+ try (final InputStream in = session.read(flowFile)) {
+ StreamUtils.fillBuffer(in, buffer);
+ }
+
+ assertEquals(new String(buffer), "hello, world");
+ }
+
+ @Test
+ public void testWriteToOutputStreamWhileReading() throws IOException {
+ final ContentClaim claim = contentRepo.create(false);
+ try (final OutputStream out = contentRepo.write(claim)) {
+ out.write("hello, world".getBytes());
+ }
+
+ final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
+ .contentClaim(claim)
+ .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+ .entryDate(System.currentTimeMillis())
+ .size(12L)
+ .build();
+ flowFileQueue.put(flowFileRecord);
+
+ final FlowFile flowFile = session.get();
+ InputStream in = session.read(flowFile);
+
+ try {
+ session.write(flowFile);
+ Assert.fail("Was able to obtain an OutputStream for a FlowFile while also holding an InputStream for it");
+ } catch (final IllegalStateException e) {
+ // expected
+ } finally {
+ in.close();
+ }
+
+ // Should now be okay
+ try (final OutputStream out = session.write(flowFile)) {
+
+ }
+ }
+
+ @Test
+ public void testReadFromInputStreamWhileWriting() throws IOException {
+ final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
+ .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+ .entryDate(System.currentTimeMillis())
+ .size(12L)
+ .build();
+ flowFileQueue.put(flowFileRecord);
+
+ final FlowFile flowFile = session.get();
+ OutputStream out = session.write(flowFile);
+
+ try {
+ session.read(flowFile);
+ Assert.fail("Was able to obtain an InputStream for a FlowFile while also holding an OutputStream for it");
+ } catch (final IllegalStateException e) {
+ // expected
+ } finally {
+ out.close();
+ }
+
+ // Should now be okay
+ try (final InputStream in = session.read(flowFile)) {
+
+ }
+ }
+
+
@Test
public void testTransferUnknownRelationship() {
final FlowFileRecord flowFileRecord1 = new StandardFlowFileRecord.Builder()
@@ -1672,6 +1772,31 @@ public class TestStandardProcessSession {
assertEquals(5, transientClaims.size());
}
+ @Test
+ public void testMultipleReadCounts() throws IOException {
+ flowFileQueue.put(new MockFlowFile(1L));
+
+ FlowFile flowFile = session.get();
+
+ final List streams = new ArrayList<>();
+ for (int i = 0; i < 3; i++) {
+ streams.add(session.read(flowFile));
+ }
+
+ for (int i = 0; i < 3; i++) {
+ try {
+ flowFile = session.putAttribute(flowFile, "counter", String.valueOf(i));
+ Assert.fail("Was able to put attribute while reading");
+ } catch (final IllegalStateException ise) {
+ // expected
+ }
+
+ streams.get(i).close();
+ }
+
+ flowFile = session.putAttribute(flowFile, "counter", "4");
+ }
+
private static class MockFlowFileRepository implements FlowFileRepository {