NIFI-3860: Relax the constraint that the ProcessSession must always be given the most up-to-date version of a FlowFile

This closes #1778.
This commit is contained in:
Mark Payne 2017-05-10 09:07:17 -04:00
parent 6ffb78d404
commit 603f713a40
6 changed files with 626 additions and 244 deletions

View File

@ -135,11 +135,18 @@ public interface ProcessSession {
* {@link #read(FlowFile, InputStreamCallback)}, {@link #read(FlowFile, boolean, InputStreamCallback)} for any of
* the given FlowFiles.</li>
* <li>No InputStream can be open for the content of any of the given FlowFiles (see {@link #read(FlowFile)}).</li>
* <li>Each of the FlowFiles provided must be the most up-to-date copy of the FlowFile.</li>
* <li>No OutputStream can be open for the content of any of the given FlowFiles (see {@link #write(FlowFile)}.</li>
* <li>For any provided FlowFile, if the FlowFile has any child (e.g., by calling {@link #create(FlowFile)} and passing the FlowFile
* as the argument), then all children that were created must also be in the Collection of provided FlowFiles.</li>
* </ul>
*
* <p>
* Also note, that if any FlowFile given is not the most up-to-date version of that FlowFile, then the most up-to-date
* version of the FlowFile will be migrated to the new owner. For example, if a call to {@link #putAttribute(FlowFile, String, String)} is made,
* passing <code>flowFile1</code> as the FlowFile, and then <code>flowFile1</code> is passed to this method, then the newest version (including the
* newly added attribute) will be migrated, not the outdated version of the FlowFile that <code>flowFile1</code> points to.
* </p>
*
* @param newOwner the ProcessSession that is to become the new owner of all FlowFiles
* that currently belong to {@code this}.
* @param flowFiles the FlowFiles to migrate
@ -528,8 +535,11 @@ public interface ProcessSession {
* @param source flowfile to retrieve content of
* @param reader that will be called to read the flowfile content
* @throws IllegalStateException if detected that this method is being
* called from within a callback of another method in this session and for
* the given FlowFile(s)
* called from within a write callback of another method (i.e., from within the callback
* that is passed to {@link #write(FlowFile, OutputStreamCallback)} or {@link #write(FlowFile, StreamCallback)})
* or has an OutputStream open (via a call to {@link #write(FlowFile)}) in this session and for
* the given FlowFile(s). Said another way, it is not permissible to call this method while writing to
* the same FlowFile.
* @throws FlowFileHandlingException if the given FlowFile is already
* transferred or removed or doesn't belong to this session. Automatic
* rollback will occur.
@ -557,8 +567,11 @@ public interface ProcessSession {
* @param flowFile the FlowFile to read
* @return an InputStream that can be used to read the contents of the FlowFile
* @throws IllegalStateException if detected that this method is being
* called from within a callback of another method in this session and for
* the given FlowFile(s)
* called from within a write callback of another method (i.e., from within the callback
* that is passed to {@link #write(FlowFile, OutputStreamCallback)} or {@link #write(FlowFile, StreamCallback)})
* or has an OutputStream open (via a call to {@link #write(FlowFile)}) in this session and for
* the given FlowFile(s). Said another way, it is not permissible to call this method while writing to
* the same FlowFile.
* @throws FlowFileHandlingException if the given FlowFile is already
* transferred or removed or doesn't belong to this session. Automatic
* rollback will occur.
@ -580,8 +593,11 @@ public interface ProcessSession {
* @param allowSessionStreamManagement allow session to hold the stream open for performance reasons
* @param reader that will be called to read the flowfile content
* @throws IllegalStateException if detected that this method is being
* called from within a callback of another method in this session and for
* the given FlowFile(s)
* called from within a write callback of another method (i.e., from within the callback
* that is passed to {@link #write(FlowFile, OutputStreamCallback)} or {@link #write(FlowFile, StreamCallback)})
* or has an OutputStream open (via a call to {@link #write(FlowFile)}) in this session and for
* the given FlowFile(s). Said another way, it is not permissible to call this method while writing to
* the same FlowFile.
* @throws FlowFileHandlingException if the given FlowFile is already
* transferred or removed or doesn't belong to this session. Automatic
* rollback will occur.
@ -664,7 +680,8 @@ public interface ProcessSession {
* @return updated FlowFile
* @throws IllegalStateException if detected that this method is being
* called from within a callback of another method in this session and for
* the given FlowFile(s)
* the given FlowFile(s), or if there is an open InputStream or OutputStream for the FlowFile's content
* (see {@link #read(FlowFile)} and {@link #write(FlowFile)}).
* @throws FlowFileHandlingException if the given FlowFile is already
* transferred or removed or doesn't belong to this session. Automatic
* rollback will occur.
@ -679,6 +696,32 @@ public interface ProcessSession {
*/
FlowFile write(FlowFile source, OutputStreamCallback writer) throws FlowFileAccessException;
/**
* Provides an OutputStream that can be used to write to the contents of the
* given FlowFile.
*
* @param source to write to
*
* @return an OutputStream that can be used to write to the contents of the FlowFile
*
* @throws IllegalStateException if detected that this method is being
* called from within a callback of another method in this session and for
* the given FlowFile(s), or if there is an open InputStream or OutputStream for the FlowFile's content
* (see {@link #read(FlowFile)}).
* @throws FlowFileHandlingException if the given FlowFile is already
* transferred or removed or doesn't belong to this session. Automatic
* rollback will occur.
* @throws MissingFlowFileException if the given FlowFile content cannot be
* found. The FlowFile should no longer be referenced, will be internally
* destroyed, and the session is automatically rolled back and what is left
* of the FlowFile is destroyed.
* @throws FlowFileAccessException if some IO problem occurs accessing
* FlowFile content; if an attempt is made to access the OutputStream
* provided to the given OutputStreamCallaback after this method completed
* its execution
*/
OutputStream write(FlowFile source);
/**
* Executes the given callback against the content corresponding to the
* given flow file.
@ -692,7 +735,8 @@ public interface ProcessSession {
* @return updated FlowFile
* @throws IllegalStateException if detected that this method is being
* called from within a callback of another method in this session and for
* the given FlowFile(s)
* the given FlowFile(s), or if there is an open InputStream or OutputStream for the FlowFile's content
* (see {@link #read(FlowFile)} and {@link #write(FlowFile)}).
* @throws FlowFileHandlingException if the given FlowFile is already
* transferred or removed or doesn't belong to this session. Automatic
* rollback will occur.
@ -721,6 +765,10 @@ public interface ProcessSession {
* @throws FlowFileAccessException if an attempt is made to access the
* OutputStream provided to the given OutputStreamCallaback after this
* method completed its execution
* @throws IllegalStateException if detected that this method is being
* called from within a callback of another method in this session and for
* the given FlowFile(s), or if there is an open InputStream or OutputStream for the FlowFile's content
* (see {@link #read(FlowFile)} and {@link #write(FlowFile)}).
*/
FlowFile append(FlowFile source, OutputStreamCallback writer) throws FlowFileAccessException;

View File

@ -18,6 +18,7 @@ package org.apache.nifi.util;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
@ -70,12 +71,15 @@ public class MockProcessSession implements ProcessSession {
private final Map<Long, MockFlowFile> originalVersions = new HashMap<>();
private final SharedSessionState sharedState;
private final Map<String, Long> counterMap = new HashMap<>();
private final Set<FlowFile> recursionSet = new HashSet<>();
private final Map<FlowFile, Integer> readRecursionSet = new HashMap<>();
private final Set<FlowFile> writeRecursionSet = new HashSet<>();
private final MockProvenanceReporter provenanceReporter;
private final boolean enforceReadStreamsClosed;
private final boolean enforceStreamsClosed;
// A List of InputStreams that have been created by calls to {@link #read(FlowFile)} and have not yet been closed.
private final Map<FlowFile, InputStream> openInputStreams = new HashMap<>();
// A List of OutputStreams that have been created by calls to {@link #write(FlowFile)} and have not yet been closed.
private final Map<FlowFile, OutputStream> openOutputStreams = new HashMap<>();
private boolean committed = false;
private boolean rolledback = false;
@ -87,9 +91,9 @@ public class MockProcessSession implements ProcessSession {
this(sharedState, processor, true);
}
public MockProcessSession(final SharedSessionState sharedState, final Processor processor, final boolean enforceReadStreamsClosed) {
public MockProcessSession(final SharedSessionState sharedState, final Processor processor, final boolean enforceStreamsClosed) {
this.processor = processor;
this.enforceReadStreamsClosed = enforceReadStreamsClosed;
this.enforceStreamsClosed = enforceStreamsClosed;
this.sharedState = sharedState;
this.processorQueue = sharedState.getFlowFileQueue();
provenanceReporter = new MockProvenanceReporter(this, sharedState, processor.getIdentifier(), processor.getClass().getSimpleName());
@ -134,14 +138,26 @@ public class MockProcessSession implements ProcessSession {
for (final FlowFile flowFile : flowFiles) {
if (openInputStreams.containsKey(flowFile)) {
throw new IllegalStateException(flowFile + " cannot be migrated to a new Process Session because this session currently "
+ "has an open Input Stream for the FlowFile, created by calling ProcessSession.read(FlowFile)");
+ "has an open InputStream for the FlowFile, created by calling ProcessSession.read(FlowFile)");
}
if (recursionSet.contains(flowFile)) {
if (openOutputStreams.containsKey(flowFile)) {
throw new IllegalStateException(flowFile + " cannot be migrated to a new Process Session because this session currently "
+ "has an open OutputStream for the FlowFile, created by calling ProcessSession.write(FlowFile)");
}
if (readRecursionSet.containsKey(flowFile)) {
throw new IllegalStateException(flowFile + " already in use for an active callback or InputStream created by ProcessSession.read(FlowFile) has not been closed");
}
ensureCurrentVersion(flowFile);
if (writeRecursionSet.contains(flowFile)) {
throw new IllegalStateException(flowFile + " already in use for an active callback or OutputStream created by ProcessSession.write(FlowFile) has not been closed");
}
final FlowFile currentVersion = currentVersions.get(flowFile.getId());
if (currentVersion == null) {
throw new FlowFileHandlingException(flowFile + " is not known in this session");
}
}
for (final Map.Entry<Relationship, List<MockFlowFile>> entry : transferMap.entrySet()) {
@ -185,8 +201,8 @@ public class MockProcessSession implements ProcessSession {
}
@Override
public MockFlowFile clone(final FlowFile flowFile) {
validateState(flowFile);
public MockFlowFile clone(FlowFile flowFile) {
flowFile = validateState(flowFile);
final MockFlowFile newFlowFile = new MockFlowFile(sharedState.nextFlowFileId(), flowFile);
currentVersions.put(newFlowFile.getId(), newFlowFile);
beingProcessed.add(newFlowFile.getId());
@ -194,8 +210,8 @@ public class MockProcessSession implements ProcessSession {
}
@Override
public MockFlowFile clone(final FlowFile flowFile, final long offset, final long size) {
validateState(flowFile);
public MockFlowFile clone(FlowFile flowFile, final long offset, final long size) {
flowFile = validateState(flowFile);
if (offset + size > flowFile.getSize()) {
throw new FlowFileHandlingException("Specified offset of " + offset + " and size " + size + " exceeds size of " + flowFile.toString());
}
@ -209,26 +225,34 @@ public class MockProcessSession implements ProcessSession {
return newFlowFile;
}
private void closeStreams(final Map<FlowFile, ? extends Closeable> streamMap, final boolean enforceClosed) {
final Map<FlowFile, ? extends Closeable> openStreamCopy = new HashMap<>(streamMap); // avoid ConcurrentModificationException by creating a copy of the List
for (final Map.Entry<FlowFile, ? extends Closeable> entry : openStreamCopy.entrySet()) {
final FlowFile flowFile = entry.getKey();
final Closeable openStream = entry.getValue();
try {
openStream.close();
} catch (IOException e) {
throw new FlowFileAccessException("Failed to close stream for " + flowFile, e);
}
if (enforceClosed) {
throw new FlowFileHandlingException("Cannot commit session because the following streams were created via "
+ "calls to ProcessSession.read(FlowFile) or ProcessSession.write(FlowFile) and never closed: " + streamMap);
}
}
}
@Override
public void commit() {
if (!beingProcessed.isEmpty()) {
throw new FlowFileHandlingException("Cannot commit session because the following FlowFiles have not been removed or transferred: " + beingProcessed);
}
if (!openInputStreams.isEmpty()) {
final List<InputStream> openStreamCopy = new ArrayList<>(openInputStreams.values()); // avoid ConcurrentModificationException by creating a copy of the List
for (final InputStream openInputStream : openStreamCopy) {
try {
openInputStream.close();
} catch (final IOException e) {
}
}
if (enforceReadStreamsClosed) {
throw new FlowFileHandlingException("Cannot commit session because the following Input Streams were created via "
+ "calls to ProcessSession.read(FlowFile) and never closed: " + openStreamCopy);
}
}
closeStreams(openInputStreams, enforceStreamsClosed);
closeStreams(openOutputStreams, enforceStreamsClosed);
committed = true;
beingProcessed.clear();
@ -288,8 +312,8 @@ public class MockProcessSession implements ProcessSession {
}
@Override
public void exportTo(final FlowFile flowFile, final OutputStream out) {
validateState(flowFile);
public void exportTo(FlowFile flowFile, final OutputStream out) {
flowFile = validateState(flowFile);
if (flowFile == null || out == null) {
throw new IllegalArgumentException("arguments cannot be null");
}
@ -308,8 +332,8 @@ public class MockProcessSession implements ProcessSession {
}
@Override
public void exportTo(final FlowFile flowFile, final Path path, final boolean append) {
validateState(flowFile);
public void exportTo(FlowFile flowFile, final Path path, final boolean append) {
flowFile = validateState(flowFile);
if (flowFile == null || path == null) {
throw new IllegalArgumentException("argument cannot be null");
}
@ -391,8 +415,8 @@ public class MockProcessSession implements ProcessSession {
}
@Override
public MockFlowFile importFrom(final InputStream in, final FlowFile flowFile) {
validateState(flowFile);
public MockFlowFile importFrom(final InputStream in, FlowFile flowFile) {
flowFile = validateState(flowFile);
if (in == null || flowFile == null) {
throw new IllegalArgumentException("argument cannot be null");
}
@ -413,8 +437,8 @@ public class MockProcessSession implements ProcessSession {
}
@Override
public MockFlowFile importFrom(final Path path, final boolean keepSourceFile, final FlowFile flowFile) {
validateState(flowFile);
public MockFlowFile importFrom(final Path path, final boolean keepSourceFile, FlowFile flowFile) {
flowFile = validateState(flowFile);
if (path == null || flowFile == null) {
throw new IllegalArgumentException("argument cannot be null");
}
@ -438,11 +462,9 @@ public class MockProcessSession implements ProcessSession {
}
@Override
public MockFlowFile merge(final Collection<FlowFile> sources, final FlowFile destination) {
for (final FlowFile source : sources) {
validateState(source);
}
validateState(destination);
public MockFlowFile merge(Collection<FlowFile> sources, FlowFile destination) {
sources = validateState(sources);
destination = validateState(destination);
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
for (final FlowFile flowFile : sources) {
final MockFlowFile mock = (MockFlowFile) flowFile;
@ -462,8 +484,8 @@ public class MockProcessSession implements ProcessSession {
}
@Override
public MockFlowFile putAllAttributes(final FlowFile flowFile, final Map<String, String> attrs) {
validateState(flowFile);
public MockFlowFile putAllAttributes(FlowFile flowFile, final Map<String, String> attrs) {
flowFile = validateState(flowFile);
if (attrs == null || flowFile == null) {
throw new IllegalArgumentException("argument cannot be null");
}
@ -479,8 +501,8 @@ public class MockProcessSession implements ProcessSession {
}
@Override
public MockFlowFile putAttribute(final FlowFile flowFile, final String attrName, final String attrValue) {
validateState(flowFile);
public MockFlowFile putAttribute(FlowFile flowFile, final String attrName, final String attrValue) {
flowFile = validateState(flowFile);
if (attrName == null || attrValue == null || flowFile == null) {
throw new IllegalArgumentException("argument cannot be null");
}
@ -508,19 +530,19 @@ public class MockProcessSession implements ProcessSession {
}
@Override
public void read(final FlowFile flowFile, boolean allowSessionStreamManagement, final InputStreamCallback callback) {
public void read(FlowFile flowFile, boolean allowSessionStreamManagement, final InputStreamCallback callback) {
if (callback == null || flowFile == null) {
throw new IllegalArgumentException("argument cannot be null");
}
validateState(flowFile);
flowFile = validateState(flowFile);
if (!(flowFile instanceof MockFlowFile)) {
throw new IllegalArgumentException("Cannot export a flow file that I did not create");
}
final MockFlowFile mock = (MockFlowFile) flowFile;
final ByteArrayInputStream bais = new ByteArrayInputStream(mock.getData());
recursionSet.add(flowFile);
incrementReadCount(flowFile);
try {
callback.process(bais);
if(!allowSessionStreamManagement){
@ -529,21 +551,35 @@ public class MockProcessSession implements ProcessSession {
} catch (final IOException e) {
throw new ProcessException(e.toString(), e);
} finally {
recursionSet.remove(flowFile);
decrementReadCount(flowFile);
}
}
private void incrementReadCount(final FlowFile flowFile) {
readRecursionSet.compute(flowFile, (ff, count) -> count == null ? 1 : count + 1);
}
private void decrementReadCount(final FlowFile flowFile) {
final Integer count = readRecursionSet.get(flowFile);
if (count == null) {
return;
}
final int updatedCount = count - 1;
if (updatedCount == 0) {
readRecursionSet.remove(flowFile);
} else {
readRecursionSet.put(flowFile, updatedCount);
}
}
@Override
public InputStream read(final FlowFile flowFile) {
public InputStream read(FlowFile flowFile) {
if (flowFile == null) {
throw new IllegalArgumentException("FlowFile cannot be null");
}
validateState(flowFile);
if (!(flowFile instanceof MockFlowFile)) {
throw new IllegalArgumentException("Cannot export a flow file that I did not create");
}
final MockFlowFile mock = (MockFlowFile) flowFile;
final MockFlowFile mock = validateState(flowFile);
final ByteArrayInputStream bais = new ByteArrayInputStream(mock.getData());
final InputStream errorHandlingStream = new InputStream() {
@ -559,23 +595,23 @@ public class MockProcessSession implements ProcessSession {
@Override
public void close() throws IOException {
openInputStreams.remove(flowFile);
openInputStreams.remove(mock);
bais.close();
}
@Override
public String toString() {
return "ErrorHandlingInputStream[flowFile=" + flowFile + "]";
return "ErrorHandlingInputStream[flowFile=" + mock + "]";
}
};
openInputStreams.put(flowFile, errorHandlingStream);
openInputStreams.put(mock, errorHandlingStream);
return errorHandlingStream;
}
@Override
public void remove(final FlowFile flowFile) {
validateState(flowFile);
public void remove(FlowFile flowFile) {
flowFile = validateState(flowFile);
final Iterator<MockFlowFile> penalizedItr = penalized.iterator();
while (penalizedItr.hasNext()) {
@ -603,10 +639,8 @@ public class MockProcessSession implements ProcessSession {
}
@Override
public void remove(final Collection<FlowFile> flowFiles) {
for (final FlowFile flowFile : flowFiles) {
validateState(flowFile);
}
public void remove(Collection<FlowFile> flowFiles) {
flowFiles = validateState(flowFiles);
for (final FlowFile flowFile : flowFiles) {
remove(flowFile);
@ -614,8 +648,8 @@ public class MockProcessSession implements ProcessSession {
}
@Override
public MockFlowFile removeAllAttributes(final FlowFile flowFile, final Set<String> attrNames) {
validateState(flowFile);
public MockFlowFile removeAllAttributes(FlowFile flowFile, final Set<String> attrNames) {
flowFile = validateState(flowFile);
if (attrNames == null || flowFile == null) {
throw new IllegalArgumentException("argument cannot be null");
}
@ -632,8 +666,8 @@ public class MockProcessSession implements ProcessSession {
}
@Override
public MockFlowFile removeAllAttributes(final FlowFile flowFile, final Pattern keyPattern) {
validateState(flowFile);
public MockFlowFile removeAllAttributes(FlowFile flowFile, final Pattern keyPattern) {
flowFile = validateState(flowFile);
if (flowFile == null) {
throw new IllegalArgumentException("flowFile cannot be null");
}
@ -652,8 +686,8 @@ public class MockProcessSession implements ProcessSession {
}
@Override
public MockFlowFile removeAttribute(final FlowFile flowFile, final String attrName) {
validateState(flowFile);
public MockFlowFile removeAttribute(FlowFile flowFile, final String attrName) {
flowFile = validateState(flowFile);
if (attrName == null || flowFile == null) {
throw new IllegalArgumentException("argument cannot be null");
}
@ -681,13 +715,9 @@ public class MockProcessSession implements ProcessSession {
if(committed){
return;
}
final List<InputStream> openStreamCopy = new ArrayList<>(openInputStreams.values()); // avoid ConcurrentModificationException by creating a copy of the List
for (final InputStream openInputStream : openStreamCopy) {
try {
openInputStream.close();
} catch (final IOException e) {
}
}
closeStreams(openInputStreams, false);
closeStreams(openOutputStreams, false);
for (final List<MockFlowFile> list : transferMap.values()) {
for (final MockFlowFile flowFile : list) {
@ -720,8 +750,8 @@ public class MockProcessSession implements ProcessSession {
}
@Override
public void transfer(final FlowFile flowFile) {
validateState(flowFile);
public void transfer(FlowFile flowFile) {
flowFile = validateState(flowFile);
if (!(flowFile instanceof MockFlowFile)) {
throw new IllegalArgumentException("I only accept MockFlowFile");
}
@ -748,7 +778,7 @@ public class MockProcessSession implements ProcessSession {
}
@Override
public void transfer(final FlowFile flowFile, final Relationship relationship) {
public void transfer(FlowFile flowFile, final Relationship relationship) {
if (relationship == Relationship.SELF) {
transfer(flowFile);
return;
@ -757,7 +787,7 @@ public class MockProcessSession implements ProcessSession {
throw new IllegalArgumentException("this relationship " + relationship.getName() + " is not known");
}
validateState(flowFile);
flowFile = validateState(flowFile);
List<MockFlowFile> list = transferMap.computeIfAbsent(relationship, r -> new ArrayList<>());
beingProcessed.remove(flowFile.getId());
@ -766,7 +796,7 @@ public class MockProcessSession implements ProcessSession {
}
@Override
public void transfer(final Collection<FlowFile> flowFiles, final Relationship relationship) {
public void transfer(Collection<FlowFile> flowFiles, final Relationship relationship) {
if (relationship == Relationship.SELF) {
transfer(flowFiles);
return;
@ -777,8 +807,8 @@ public class MockProcessSession implements ProcessSession {
}
@Override
public MockFlowFile write(final FlowFile flowFile, final OutputStreamCallback callback) {
validateState(flowFile);
public MockFlowFile write(FlowFile flowFile, final OutputStreamCallback callback) {
flowFile = validateState(flowFile);
if (callback == null || flowFile == null) {
throw new IllegalArgumentException("argument cannot be null");
}
@ -788,13 +818,13 @@ public class MockProcessSession implements ProcessSession {
final MockFlowFile mock = (MockFlowFile) flowFile;
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
recursionSet.add(flowFile);
writeRecursionSet.add(flowFile);
try {
callback.process(baos);
} catch (final IOException e) {
throw new ProcessException(e.toString(), e);
} finally {
recursionSet.remove(flowFile);
writeRecursionSet.remove(flowFile);
}
final MockFlowFile newFlowFile = new MockFlowFile(mock.getId(), flowFile);
@ -805,15 +835,35 @@ public class MockProcessSession implements ProcessSession {
}
@Override
public FlowFile append(final FlowFile flowFile, final OutputStreamCallback callback) {
validateState(flowFile);
if (callback == null || flowFile == null) {
throw new IllegalArgumentException("argument cannot be null");
}
public OutputStream write(FlowFile flowFile) {
if (!(flowFile instanceof MockFlowFile)) {
throw new IllegalArgumentException("Cannot export a flow file that I did not create");
}
final MockFlowFile mock = (MockFlowFile) flowFile;
final MockFlowFile mockFlowFile = validateState(flowFile);
writeRecursionSet.add(flowFile);
final ByteArrayOutputStream baos = new ByteArrayOutputStream() {
@Override
public void close() throws IOException {
super.close();
writeRecursionSet.remove(mockFlowFile);
final MockFlowFile newFlowFile = new MockFlowFile(mockFlowFile.getId(), flowFile);
currentVersions.put(newFlowFile.getId(), newFlowFile);
newFlowFile.setData(toByteArray());
}
};
return baos;
}
@Override
public FlowFile append(FlowFile flowFile, final OutputStreamCallback callback) {
if (callback == null || flowFile == null) {
throw new IllegalArgumentException("argument cannot be null");
}
final MockFlowFile mock = validateState(flowFile);
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
try {
@ -832,25 +882,21 @@ public class MockProcessSession implements ProcessSession {
@Override
public MockFlowFile write(final FlowFile flowFile, final StreamCallback callback) {
validateState(flowFile);
if (callback == null || flowFile == null) {
throw new IllegalArgumentException("argument cannot be null");
}
if (!(flowFile instanceof MockFlowFile)) {
throw new IllegalArgumentException("Cannot export a flow file that I did not create");
}
final MockFlowFile mock = (MockFlowFile) flowFile;
final MockFlowFile mock = validateState(flowFile);
final ByteArrayInputStream in = new ByteArrayInputStream(mock.getData());
final ByteArrayOutputStream out = new ByteArrayOutputStream();
recursionSet.add(flowFile);
writeRecursionSet.add(flowFile);
try {
callback.process(in, out);
} catch (final IOException e) {
throw new ProcessException(e.toString(), e);
} finally {
recursionSet.remove(flowFile);
writeRecursionSet.remove(flowFile);
}
final MockFlowFile newFlowFile = new MockFlowFile(mock.getId(), flowFile);
@ -912,10 +958,8 @@ public class MockProcessSession implements ProcessSession {
@Override
public MockFlowFile merge(Collection<FlowFile> sources, FlowFile destination, byte[] header, byte[] footer, byte[] demarcator) {
for (final FlowFile flowFile : sources) {
validateState(flowFile);
}
validateState(destination);
sources = validateState(sources);
destination = validateState(destination);
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
try {
@ -945,30 +989,37 @@ public class MockProcessSession implements ProcessSession {
return newFlowFile;
}
private void validateState(final FlowFile flowFile) {
private List<FlowFile> validateState(final Collection<FlowFile> flowFiles) {
return flowFiles.stream()
.map(ff -> validateState(ff))
.collect(Collectors.toList());
}
private MockFlowFile validateState(final FlowFile flowFile) {
Objects.requireNonNull(flowFile);
ensureCurrentVersion(flowFile);
if (recursionSet.contains(flowFile)) {
final MockFlowFile currentVersion = currentVersions.get(flowFile.getId());
if (currentVersion == null) {
throw new FlowFileHandlingException(flowFile + " is not known in this session");
}
if (readRecursionSet.containsKey(flowFile)) {
throw new IllegalStateException(flowFile + " already in use for an active callback or InputStream created by ProcessSession.read(FlowFile) has not been closed");
}
if (writeRecursionSet.contains(flowFile)) {
throw new IllegalStateException(flowFile + " already in use for an active callback or OutputStream created by ProcessSession.write(FlowFile) has not been closed");
}
for (final List<MockFlowFile> flowFiles : transferMap.values()) {
if (flowFiles.contains(flowFile)) {
throw new IllegalStateException(flowFile + " has already been transferred");
}
}
return currentVersion;
}
private void ensureCurrentVersion(final FlowFile flowFile) {
final FlowFile currentVersion = currentVersions.get(flowFile.getId());
if (currentVersion == null) {
throw new FlowFileHandlingException(flowFile + " is not known in this session");
}
if (currentVersion != flowFile) {
throw new FlowFileHandlingException(flowFile + " is not the most recent version of this flow file within this session");
}
}
/**
* Inherits the attributes from the given source flow file into another flow
@ -1235,8 +1286,8 @@ public class MockProcessSession implements ProcessSession {
}
@Override
public MockFlowFile penalize(final FlowFile flowFile) {
validateState(flowFile);
public MockFlowFile penalize(FlowFile flowFile) {
flowFile = validateState(flowFile);
final MockFlowFile mockFlowFile = (MockFlowFile) flowFile;
final MockFlowFile newFlowFile = new MockFlowFile(mockFlowFile.getId(), flowFile);
currentVersions.put(newFlowFile.getId(), newFlowFile);
@ -1245,8 +1296,8 @@ public class MockProcessSession implements ProcessSession {
return newFlowFile;
}
public byte[] getContentAsByteArray(final MockFlowFile flowFile) {
validateState(flowFile);
public byte[] getContentAsByteArray(MockFlowFile flowFile) {
flowFile = validateState(flowFile);
return flowFile.getData();
}

View File

@ -38,11 +38,6 @@ import org.junit.Test;
public class TestMockProcessSession {
@Test(expected = AssertionError.class)
public void testPenalizeFlowFileFromProcessor() {
TestRunners.newTestRunner(PoorlyBehavedProcessor.class).run();
}
@Test
public void testReadWithoutCloseThrowsExceptionOnCommit() throws IOException {
final Processor processor = new PoorlyBehavedProcessor();

View File

@ -251,5 +251,10 @@ public class BatchingSessionFactory implements ProcessSessionFactory {
public ProvenanceReporter getProvenanceReporter() {
return session.getProvenanceReporter();
}
@Override
public OutputStream write(final FlowFile source) {
return session.write(source);
}
}
}

View File

@ -18,6 +18,7 @@ package org.apache.nifi.controller.repository;
import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
@ -110,7 +111,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
private final Map<String, Long> counters = new HashMap<>();
private final Map<ContentClaim, ByteCountingOutputStream> appendableStreams = new HashMap<>();
private final ProcessContext context;
private final Set<FlowFile> recursionSet = new HashSet<>();// set used to track what is currently being operated on to prevent logic failures if recursive calls occurring
private final Map<FlowFile, Integer> readRecursionSet = new HashMap<>();// set used to track what is currently being operated on to prevent logic failures if recursive calls occurring
private final Set<FlowFile> writeRecursionSet = new HashSet<>();
private final Map<FlowFile, Path> deleteOnCommit = new HashMap<>();
private final long sessionId;
private final String connectableDescription;
@ -133,6 +135,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
// List of InputStreams that have been opened by calls to {@link #read(FlowFile)} and not yet closed
private final Map<FlowFile, InputStream> openInputStreams = new HashMap<>();
// List of OutputStreams that have been opened by calls to {@link #write(FlowFile)} and not yet closed
private final Map<FlowFile, OutputStream> openOutputStreams = new HashMap<>();
// maps a FlowFile to all Provenance Events that were generated for that FlowFile.
// we do this so that if we generate a Fork event, for example, and then remove the event in the same
@ -187,14 +191,11 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
processingStartTime = System.nanoTime();
}
public void checkpoint() {
resetWriteClaims(false);
final Map<FlowFile, InputStream> openStreamCopy = new HashMap<>(openInputStreams); // avoid ConcurrentModificationException by creating a copy of the List
for (final Map.Entry<FlowFile, InputStream> entry : openStreamCopy.entrySet()) {
private void closeStreams(final Map<FlowFile, ? extends Closeable> streamMap) {
final Map<FlowFile, ? extends Closeable> openStreamCopy = new HashMap<>(streamMap); // avoid ConcurrentModificationException by creating a copy of the List
for (final Map.Entry<FlowFile, ? extends Closeable> entry : openStreamCopy.entrySet()) {
final FlowFile flowFile = entry.getKey();
final InputStream openStream = entry.getValue();
final Closeable openStream = entry.getValue();
LOG.warn("{} closing {} for {} because the session was committed without the stream being closed.", this, openStream, flowFile);
@ -205,8 +206,19 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
LOG.warn("", e);
}
}
}
if (!recursionSet.isEmpty()) {
public void checkpoint() {
resetWriteClaims(false);
closeStreams(openInputStreams);
closeStreams(openOutputStreams);
if (!readRecursionSet.isEmpty()) {
throw new IllegalStateException();
}
if (!writeRecursionSet.isEmpty()) {
throw new IllegalStateException();
}
@ -902,19 +914,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
deleteOnCommit.clear();
final Map<FlowFile, InputStream> openStreamCopy = new HashMap<>(openInputStreams); // avoid ConcurrentModificationException by creating a copy of the List
for (final Map.Entry<FlowFile, InputStream> entry : openStreamCopy.entrySet()) {
final FlowFile flowFile = entry.getKey();
final InputStream openStream = entry.getValue();
LOG.debug("{} closing {} for {} due to session rollback", this, openStream, flowFile);
try {
openStream.close();
} catch (final Exception e) {
LOG.warn("{} Attempted to close {} for {} due to session rollback but close failed", this, openStream, this.connectableDescription);
LOG.warn("", e);
}
}
closeStreams(openInputStreams);
closeStreams(openOutputStreams);
try {
claimCache.reset();
@ -1092,7 +1093,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
private void resetState() {
records.clear();
recursionSet.clear();
readRecursionSet.clear();
writeRecursionSet.clear();
contentSizeIn = 0L;
contentSizeOut = 0L;
flowFilesIn = 0;
@ -1142,12 +1144,21 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
for (final FlowFile flowFile : flowFiles) {
if (openInputStreams.containsKey(flowFile)) {
throw new IllegalStateException(flowFile + " cannot be migrated to a new Process Session because this session currently "
+ "has an open Input Stream for the FlowFile, created by calling ProcessSession.read(FlowFile)");
+ "has an open InputStream for the FlowFile, created by calling ProcessSession.read(FlowFile)");
}
if (recursionSet.contains(flowFile)) {
if (openOutputStreams.containsKey(flowFile)) {
throw new IllegalStateException(flowFile + " cannot be migrated to a new Process Session because this session currently "
+ "has an open OutputStream for the FlowFile, created by calling ProcessSession.write(FlowFile)");
}
if (readRecursionSet.containsKey(flowFile)) {
throw new IllegalStateException(flowFile + " already in use for an active callback or InputStream created by ProcessSession.read(FlowFile) has not been closed");
}
if (writeRecursionSet.contains(flowFile)) {
throw new IllegalStateException(flowFile + " already in use for an active callback or OutputStream created by ProcessSession.write(FlowFile) has not been closed");
}
final StandardRepositoryRecord record = records.get(flowFile);
if (record == null) {
throw new FlowFileHandlingException(flowFile + " is not known in this session (" + toString() + ")");
@ -1614,8 +1625,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
@Override
public FlowFile clone(final FlowFile example, final long offset, final long size) {
validateRecordState(example);
public FlowFile clone(FlowFile example, final long offset, final long size) {
example = validateRecordState(example);
final StandardRepositoryRecord exampleRepoRecord = records.get(example);
final FlowFileRecord currRec = exampleRepoRecord.getCurrent();
final ContentClaim claim = exampleRepoRecord.getCurrentClaim();
@ -1683,8 +1694,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
@Override
public FlowFile penalize(final FlowFile flowFile) {
validateRecordState(flowFile);
public FlowFile penalize(FlowFile flowFile) {
flowFile = validateRecordState(flowFile);
final StandardRepositoryRecord record = records.get(flowFile);
final long expirationEpochMillis = System.currentTimeMillis() + context.getConnectable().getPenalizationPeriod(TimeUnit.MILLISECONDS);
final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).penaltyExpirationTime(expirationEpochMillis).build();
@ -1693,8 +1704,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
@Override
public FlowFile putAttribute(final FlowFile flowFile, final String key, final String value) {
validateRecordState(flowFile);
public FlowFile putAttribute(FlowFile flowFile, final String key, final String value) {
flowFile = validateRecordState(flowFile);
if (CoreAttributes.UUID.key().equals(key)) {
return flowFile;
@ -1708,8 +1719,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
@Override
public FlowFile putAllAttributes(final FlowFile flowFile, final Map<String, String> attributes) {
validateRecordState(flowFile);
public FlowFile putAllAttributes(FlowFile flowFile, final Map<String, String> attributes) {
flowFile = validateRecordState(flowFile);
final StandardRepositoryRecord record = records.get(flowFile);
final Map<String, String> updatedAttributes;
@ -1728,8 +1739,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
@Override
public FlowFile removeAttribute(final FlowFile flowFile, final String key) {
validateRecordState(flowFile);
public FlowFile removeAttribute(FlowFile flowFile, final String key) {
flowFile = validateRecordState(flowFile);
if (CoreAttributes.UUID.key().equals(key)) {
return flowFile;
@ -1742,8 +1753,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
@Override
public FlowFile removeAllAttributes(final FlowFile flowFile, final Set<String> keys) {
validateRecordState(flowFile);
public FlowFile removeAllAttributes(FlowFile flowFile, final Set<String> keys) {
flowFile = validateRecordState(flowFile);
if (keys == null) {
return flowFile;
@ -1766,8 +1777,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
@Override
public FlowFile removeAllAttributes(final FlowFile flowFile, final Pattern keyPattern) {
validateRecordState(flowFile);
public FlowFile removeAllAttributes(FlowFile flowFile, final Pattern keyPattern) {
flowFile = validateRecordState(flowFile);
final StandardRepositoryRecord record = records.get(flowFile);
final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).removeAttributes(keyPattern).build();
@ -1800,8 +1811,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
@Override
public void transfer(final FlowFile flowFile, final Relationship relationship) {
validateRecordState(flowFile);
public void transfer(FlowFile flowFile, final Relationship relationship) {
flowFile = validateRecordState(flowFile);
final int numDestinations = context.getConnections(relationship).size();
final int multiplier = Math.max(1, numDestinations);
@ -1830,8 +1841,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
@Override
public void transfer(final FlowFile flowFile) {
validateRecordState(flowFile);
public void transfer(FlowFile flowFile) {
flowFile = validateRecordState(flowFile);
final StandardRepositoryRecord record = records.get(flowFile);
if (record.getOriginalQueue() == null) {
throw new IllegalArgumentException("Cannot transfer FlowFiles that are created in this Session back to self");
@ -1848,8 +1859,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
@Override
public void transfer(final Collection<FlowFile> flowFiles, final Relationship relationship) {
validateRecordState(flowFiles);
public void transfer(Collection<FlowFile> flowFiles, final Relationship relationship) {
flowFiles = validateRecordState(flowFiles);
boolean autoTerminated = false;
boolean selfRelationship = false;
@ -1885,8 +1896,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
@Override
public void remove(final FlowFile flowFile) {
validateRecordState(flowFile);
public void remove(FlowFile flowFile) {
flowFile = validateRecordState(flowFile);
final StandardRepositoryRecord record = records.get(flowFile);
record.markForDelete();
removedFlowFiles.add(flowFile.getAttribute(CoreAttributes.UUID.key()));
@ -1906,8 +1917,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
@Override
public void remove(final Collection<FlowFile> flowFiles) {
validateRecordState(flowFiles);
public void remove(Collection<FlowFile> flowFiles) {
flowFiles = validateRecordState(flowFiles);
for (final FlowFile flowFile : flowFiles) {
final StandardRepositoryRecord record = records.get(flowFile);
record.markForDelete();
@ -2044,7 +2055,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
// If the recursion set is empty, we can use the same input stream that we already have open. However, if
// the recursion set is NOT empty, we can't do this because we may be reading the input of FlowFile 1 while in the
// callback for reading FlowFile 1 and if we used the same stream we'd be destroying the ability to read from FlowFile 1.
if (allowCachingOfStream && recursionSet.isEmpty()) {
if (allowCachingOfStream && readRecursionSet.isEmpty() && writeRecursionSet.isEmpty()) {
if (currentReadClaim == claim) {
if (currentReadClaimStream != null && currentReadClaimStream.getBytesConsumed() <= offset) {
final long bytesToSkip = offset - currentReadClaimStream.getBytesConsumed();
@ -2097,7 +2108,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
@Override
public void read(FlowFile source, boolean allowSessionStreamManagement, InputStreamCallback reader) {
validateRecordState(source);
source = validateRecordState(source, true);
final StandardRepositoryRecord record = records.get(source);
try {
@ -2121,7 +2132,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
boolean cnfeThrown = false;
try {
recursionSet.add(source);
incrementReadCount(source);
reader.process(ffais);
// Allow processors to close the file after reading to avoid too many files open or do smart session stream management.
@ -2133,7 +2144,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
cnfeThrown = true;
throw cnfe;
} finally {
recursionSet.remove(source);
decrementReadCount(source);
bytesRead += countingStream.getBytesRead();
// if cnfeThrown is true, we don't need to re-thrown the Exception; it will propagate.
@ -2150,8 +2161,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
@Override
public InputStream read(final FlowFile source) {
validateRecordState(source);
public InputStream read(FlowFile source) {
source = validateRecordState(source, true);
final StandardRepositoryRecord record = records.get(source);
try {
@ -2165,6 +2176,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
final ByteCountingInputStream countingStream = new ByteCountingInputStream(limitedIn);
final FlowFileAccessInputStream ffais = new FlowFileAccessInputStream(countingStream, source, record.getCurrentClaim());
final FlowFile sourceFlowFile = source;
final InputStream errorHandlingStream = new InputStream() {
private boolean closed = false;
@ -2177,7 +2189,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
close();
throw cnfe;
} catch (final FlowFileAccessException ffae) {
LOG.error("Failed to read content from " + source + "; rolling back session", ffae);
LOG.error("Failed to read content from " + sourceFlowFile + "; rolling back session", ffae);
rollback(true);
close();
throw ffae;
@ -2198,7 +2210,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
close();
throw cnfe;
} catch (final FlowFileAccessException ffae) {
LOG.error("Failed to read content from " + source + "; rolling back session", ffae);
LOG.error("Failed to read content from " + sourceFlowFile + "; rolling back session", ffae);
rollback(true);
close();
throw ffae;
@ -2207,13 +2219,15 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
@Override
public void close() throws IOException {
decrementReadCount(sourceFlowFile);
if (!closed) {
StandardProcessSession.this.bytesRead += countingStream.getBytesRead();
closed = true;
}
ffais.close();
openInputStreams.remove(source);
openInputStreams.remove(sourceFlowFile);
}
@Override
@ -2243,23 +2257,42 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
@Override
public String toString() {
return "ErrorHandlingInputStream[FlowFile=" + source + "]";
return "ErrorHandlingInputStream[FlowFile=" + sourceFlowFile + "]";
}
};
openInputStreams.put(source, errorHandlingStream);
incrementReadCount(sourceFlowFile);
openInputStreams.put(sourceFlowFile, errorHandlingStream);
return errorHandlingStream;
}
private void incrementReadCount(final FlowFile flowFile) {
readRecursionSet.compute(flowFile, (ff, count) -> count == null ? 1 : count + 1);
}
private void decrementReadCount(final FlowFile flowFile) {
final Integer count = readRecursionSet.get(flowFile);
if (count == null) {
return;
}
final int updatedCount = count - 1;
if (updatedCount == 0) {
readRecursionSet.remove(flowFile);
} else {
readRecursionSet.put(flowFile, updatedCount);
}
}
@Override
public FlowFile merge(final Collection<FlowFile> sources, final FlowFile destination) {
return merge(sources, destination, null, null, null);
}
@Override
public FlowFile merge(final Collection<FlowFile> sources, final FlowFile destination, final byte[] header, final byte[] footer, final byte[] demarcator) {
validateRecordState(sources);
validateRecordState(destination);
public FlowFile merge(Collection<FlowFile> sources, FlowFile destination, final byte[] header, final byte[] footer, final byte[] demarcator) {
sources = validateRecordState(sources);
destination = validateRecordState(destination);
if (sources.contains(destination)) {
throw new IllegalArgumentException("Destination cannot be within sources");
}
@ -2358,8 +2391,125 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
@Override
public FlowFile write(final FlowFile source, final OutputStreamCallback writer) {
validateRecordState(source);
public OutputStream write(FlowFile source) {
source = validateRecordState(source);
final StandardRepositoryRecord record = records.get(source);
ContentClaim newClaim = null;
try {
newClaim = claimCache.getContentClaim();
claimLog.debug("Creating ContentClaim {} for 'write' for {}", newClaim, source);
ensureNotAppending(newClaim);
final OutputStream rawStream = claimCache.write(newClaim);
final OutputStream disableOnClose = new DisableOnCloseOutputStream(rawStream);
final ByteCountingOutputStream countingOut = new ByteCountingOutputStream(disableOnClose);
final FlowFile sourceFlowFile = source;
final ContentClaim updatedClaim = newClaim;
final OutputStream errorHandlingOutputStream = new OutputStream() {
private boolean closed = false;
@Override
public void write(final int b) throws IOException {
try {
countingOut.write(b);
} catch (final IOException ioe) {
LOG.error("Failed to write content to " + sourceFlowFile + "; rolling back session", ioe);
rollback(true);
close();
throw new FlowFileAccessException("Failed to write to Content Repository for " + sourceFlowFile, ioe);
}
}
@Override
public void write(final byte[] b) throws IOException {
try {
countingOut.write(b);
} catch (final IOException ioe) {
LOG.error("Failed to write content to " + sourceFlowFile + "; rolling back session", ioe);
rollback(true);
close();
throw new FlowFileAccessException("Failed to write to Content Repository for " + sourceFlowFile, ioe);
}
}
@Override
public void write(final byte[] b, final int off, final int len) throws IOException {
try {
countingOut.write(b, off, len);
} catch (final IOException ioe) {
LOG.error("Failed to write content to " + sourceFlowFile + "; rolling back session", ioe);
rollback(true);
close();
throw new FlowFileAccessException("Failed to write to Content Repository for " + sourceFlowFile, ioe);
}
}
@Override
public void flush() throws IOException {
try {
countingOut.flush();
} catch (final IOException ioe) {
LOG.error("Failed to write content to " + sourceFlowFile + "; rolling back session", ioe);
rollback(true);
close();
throw new FlowFileAccessException("Failed to write to Content Repository for " + sourceFlowFile, ioe);
}
}
@Override
public void close() throws IOException {
writeRecursionSet.remove(sourceFlowFile);
final long bytesWritten = countingOut.getBytesWritten();
if (!closed) {
StandardProcessSession.this.bytesWritten += bytesWritten;
closed = true;
}
openOutputStreams.remove(sourceFlowFile);
removeTemporaryClaim(record);
flush();
final FlowFileRecord newFile = new StandardFlowFileRecord.Builder()
.fromFlowFile(record.getCurrent())
.contentClaim(updatedClaim)
.contentClaimOffset(Math.max(0, updatedClaim.getLength() - bytesWritten))
.size(bytesWritten)
.build();
record.setWorking(newFile);
}
};
writeRecursionSet.add(source);
openOutputStreams.put(source, errorHandlingOutputStream);
return errorHandlingOutputStream;
} catch (final ContentNotFoundException nfe) {
resetWriteClaims(); // need to reset write claim before we can remove the claim
destroyContent(newClaim);
handleContentNotFound(nfe, record);
throw nfe;
} catch (final FlowFileAccessException ffae) {
resetWriteClaims(); // need to reset write claim before we can remove the claim
destroyContent(newClaim);
throw ffae;
} catch (final IOException ioe) {
resetWriteClaims(); // need to reset write claim before we can remove the claim
destroyContent(newClaim);
throw new ProcessException("IOException thrown from " + connectableDescription + ": " + ioe.toString(), ioe);
} catch (final Throwable t) {
resetWriteClaims(); // need to reset write claim before we can remove the claim
destroyContent(newClaim);
throw t;
}
}
@Override
public FlowFile write(FlowFile source, final OutputStreamCallback writer) {
source = validateRecordState(source);
final StandardRepositoryRecord record = records.get(source);
long writtenToFlowFile = 0L;
@ -2373,14 +2523,14 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
final OutputStream disableOnClose = new DisableOnCloseOutputStream(stream);
final ByteCountingOutputStream countingOut = new ByteCountingOutputStream(disableOnClose)) {
try {
recursionSet.add(source);
writeRecursionSet.add(source);
writer.process(new FlowFileAccessOutputStream(countingOut, source));
} finally {
writtenToFlowFile = countingOut.getBytesWritten();
bytesWritten += countingOut.getBytesWritten();
}
} finally {
recursionSet.remove(source);
writeRecursionSet.remove(source);
}
} catch (final ContentNotFoundException nfe) {
resetWriteClaims(); // need to reset write claim before we can remove the claim
@ -2412,9 +2562,10 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
return newFile;
}
@Override
public FlowFile append(final FlowFile source, final OutputStreamCallback writer) {
validateRecordState(source);
public FlowFile append(FlowFile source, final OutputStreamCallback writer) {
source = validateRecordState(source);
final StandardRepositoryRecord record = records.get(source);
long newSize = 0L;
@ -2445,10 +2596,10 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
// wrap our OutputStreams so that the processor cannot close it
try (final OutputStream disableOnClose = new DisableOnCloseOutputStream(outStream)) {
recursionSet.add(source);
writeRecursionSet.add(source);
writer.process(new FlowFileAccessOutputStream(disableOnClose, source));
} finally {
recursionSet.remove(source);
writeRecursionSet.remove(source);
}
}
} else {
@ -2458,10 +2609,10 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
// wrap our OutputStreams so that the processor cannot close it
try (final OutputStream disableOnClose = new DisableOnCloseOutputStream(outStream);
final OutputStream flowFileAccessOutStream = new FlowFileAccessOutputStream(disableOnClose, source)) {
recursionSet.add(source);
writeRecursionSet.add(source);
writer.process(flowFileAccessOutStream);
} finally {
recursionSet.remove(source);
writeRecursionSet.remove(source);
}
}
@ -2593,8 +2744,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
@Override
public FlowFile write(final FlowFile source, final StreamCallback writer) {
validateRecordState(source);
public FlowFile write(FlowFile source, final StreamCallback writer) {
source = validateRecordState(source);
final StandardRepositoryRecord record = records.get(source);
final ContentClaim currClaim = record.getCurrentClaim();
@ -2618,7 +2769,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
final OutputStream disableOnCloseOut = new DisableOnCloseOutputStream(os);
final ByteCountingOutputStream countingOut = new ByteCountingOutputStream(disableOnCloseOut)) {
recursionSet.add(source);
writeRecursionSet.add(source);
// We want to differentiate between IOExceptions thrown by the repository and IOExceptions thrown from
// Processor code. As a result, as have the FlowFileAccessInputStream that catches IOException from the repository
@ -2637,7 +2788,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
writtenToFlowFile = countingOut.getBytesWritten();
this.bytesWritten += writtenToFlowFile;
this.bytesRead += countingIn.getBytesRead();
recursionSet.remove(source);
writeRecursionSet.remove(source);
// if cnfeThrown is true, we don't need to re-thrown the Exception; it will propagate.
if (!cnfeThrown && ffais.getContentNotFoundException() != null) {
@ -2672,8 +2823,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
@Override
public FlowFile importFrom(final Path source, final boolean keepSourceFile, final FlowFile destination) {
validateRecordState(destination);
public FlowFile importFrom(final Path source, final boolean keepSourceFile, FlowFile destination) {
destination = validateRecordState(destination);
// TODO: find a better solution. With Windows 7 and Java 7 (very early update, at least), Files.isWritable(source.getParent()) returns false, even when it should be true.
if (!keepSourceFile && !Files.isWritable(source.getParent()) && !source.getParent().toFile().canWrite()) {
// If we do NOT want to keep the file, ensure that we can delete it, or else error.
@ -2722,8 +2873,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
@Override
public FlowFile importFrom(final InputStream source, final FlowFile destination) {
validateRecordState(destination);
public FlowFile importFrom(final InputStream source, FlowFile destination) {
destination = validateRecordState(destination);
final StandardRepositoryRecord record = records.get(destination);
ContentClaim newClaim = null;
final long claimOffset = 0L;
@ -2759,8 +2910,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
@Override
public void exportTo(final FlowFile source, final Path destination, final boolean append) {
validateRecordState(source);
public void exportTo(FlowFile source, final Path destination, final boolean append) {
source = validateRecordState(source);
final StandardRepositoryRecord record = records.get(source);
try {
ensureNotAppending(record.getCurrentClaim());
@ -2777,8 +2928,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
@Override
public void exportTo(final FlowFile source, final OutputStream destination) {
validateRecordState(source);
public void exportTo(FlowFile source, final OutputStream destination) {
source = validateRecordState(source);
final StandardRepositoryRecord record = records.get(source);
if(record.getCurrentClaim() == null) {
@ -2806,13 +2957,14 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
boolean cnfeThrown = false;
try {
recursionSet.add(source);
incrementReadCount(source);
StreamUtils.copy(ffais, destination, source.getSize());
} catch (final ContentNotFoundException cnfe) {
cnfeThrown = true;
throw cnfe;
} finally {
recursionSet.remove(source);
decrementReadCount(source);
IOUtils.closeQuietly(ffais);
// if cnfeThrown is true, we don't need to re-throw the Exception; it will propagate.
if (!cnfeThrown && ffais.getContentNotFoundException() != null) {
@ -2855,35 +3007,41 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
}
private void validateRecordState(final FlowFile... flowFiles) {
for (final FlowFile file : flowFiles) {
if (recursionSet.contains(file)) {
throw new IllegalStateException(file + " already in use for an active callback or InputStream created by ProcessSession.read(FlowFile) has not been closed");
}
final StandardRepositoryRecord record = records.get(file);
if (record == null) {
rollback();
throw new FlowFileHandlingException(file + " is not known in this session (" + toString() + ")");
}
if (record.getCurrent() != file) {
rollback();
throw new FlowFileHandlingException(file + " is not the most recent version of this FlowFile within this session (" + toString() + ")");
}
if (record.getTransferRelationship() != null) {
rollback();
throw new FlowFileHandlingException(file + " is already marked for transfer");
}
if (record.isMarkedForDelete()) {
rollback();
throw new FlowFileHandlingException(file + " has already been marked for removal");
}
}
private FlowFile validateRecordState(final FlowFile flowFile) {
return validateRecordState(flowFile, false);
}
private void validateRecordState(final Collection<FlowFile> flowFiles) {
for (final FlowFile flowFile : flowFiles) {
validateRecordState(flowFile);
private FlowFile validateRecordState(final FlowFile flowFile, final boolean allowRecursiveRead) {
if (!allowRecursiveRead && readRecursionSet.containsKey(flowFile)) {
throw new IllegalStateException(flowFile + " already in use for an active callback or an InputStream created by ProcessSession.read(FlowFile) has not been closed");
}
if (writeRecursionSet.contains(flowFile)) {
throw new IllegalStateException(flowFile + " already in use for an active callback or an OutputStream created by ProcessSession.write(FlowFile) has not been closed");
}
final StandardRepositoryRecord record = records.get(flowFile);
if (record == null) {
rollback();
throw new FlowFileHandlingException(flowFile + " is not known in this session (" + toString() + ")");
}
if (record.getTransferRelationship() != null) {
rollback();
throw new FlowFileHandlingException(flowFile + " is already marked for transfer");
}
if (record.isMarkedForDelete()) {
rollback();
throw new FlowFileHandlingException(flowFile + " has already been marked for removal");
}
return record.getCurrent();
}
private List<FlowFile> validateRecordState(final Collection<FlowFile> flowFiles) {
final List<FlowFile> current = new ArrayList<>(flowFiles.size());
for (final FlowFile flowFile : flowFiles) {
current.add(validateRecordState(flowFile));
}
return current;
}
/**

View File

@ -1509,7 +1509,17 @@ public class TestStandardProcessSession {
StreamUtils.fillBuffer(in, buffer);
assertEquals("hello, world", new String(buffer));
try {
session.remove(flowFile);
Assert.fail("Was able to remove FlowFile while an InputStream is open for it");
} catch (final IllegalStateException e) {
// expected
}
in.close();
session.remove(flowFile);
session.commit(); // This should generate a WARN log message. We can't really test this in a unit test but can verify manually.
}
@ -1544,6 +1554,96 @@ public class TestStandardProcessSession {
session.commit();
}
@Test
public void testWriteToOutputStream() throws IOException {
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.entryDate(System.currentTimeMillis())
.size(12L)
.build();
flowFileQueue.put(flowFileRecord);
FlowFile flowFile = session.get();
try (final OutputStream out = session.write(flowFile)) {
out.write("hello, world".getBytes());
}
// Call putAllAttributes, because this will return to us the most recent version
// of the FlowFile. In a Processor, we wouldn't need this, but for testing purposes
// we need it in order to get the Content Claim.
flowFile = session.putAllAttributes(flowFile, Collections.emptyMap());
assertEquals(12L, flowFile.getSize());
final byte[] buffer = new byte[(int) flowFile.getSize()];
try (final InputStream in = session.read(flowFile)) {
StreamUtils.fillBuffer(in, buffer);
}
assertEquals(new String(buffer), "hello, world");
}
@Test
public void testWriteToOutputStreamWhileReading() throws IOException {
final ContentClaim claim = contentRepo.create(false);
try (final OutputStream out = contentRepo.write(claim)) {
out.write("hello, world".getBytes());
}
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
.contentClaim(claim)
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.entryDate(System.currentTimeMillis())
.size(12L)
.build();
flowFileQueue.put(flowFileRecord);
final FlowFile flowFile = session.get();
InputStream in = session.read(flowFile);
try {
session.write(flowFile);
Assert.fail("Was able to obtain an OutputStream for a FlowFile while also holding an InputStream for it");
} catch (final IllegalStateException e) {
// expected
} finally {
in.close();
}
// Should now be okay
try (final OutputStream out = session.write(flowFile)) {
}
}
@Test
public void testReadFromInputStreamWhileWriting() throws IOException {
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.entryDate(System.currentTimeMillis())
.size(12L)
.build();
flowFileQueue.put(flowFileRecord);
final FlowFile flowFile = session.get();
OutputStream out = session.write(flowFile);
try {
session.read(flowFile);
Assert.fail("Was able to obtain an InputStream for a FlowFile while also holding an OutputStream for it");
} catch (final IllegalStateException e) {
// expected
} finally {
out.close();
}
// Should now be okay
try (final InputStream in = session.read(flowFile)) {
}
}
@Test
public void testTransferUnknownRelationship() {
final FlowFileRecord flowFileRecord1 = new StandardFlowFileRecord.Builder()
@ -1672,6 +1772,31 @@ public class TestStandardProcessSession {
assertEquals(5, transientClaims.size());
}
@Test
public void testMultipleReadCounts() throws IOException {
flowFileQueue.put(new MockFlowFile(1L));
FlowFile flowFile = session.get();
final List<InputStream> streams = new ArrayList<>();
for (int i = 0; i < 3; i++) {
streams.add(session.read(flowFile));
}
for (int i = 0; i < 3; i++) {
try {
flowFile = session.putAttribute(flowFile, "counter", String.valueOf(i));
Assert.fail("Was able to put attribute while reading");
} catch (final IllegalStateException ise) {
// expected
}
streams.get(i).close();
}
flowFile = session.putAttribute(flowFile, "counter", "4");
}
private static class MockFlowFileRepository implements FlowFileRepository {