From 603f713a4020a29f21a868dfec10a07c1c03b1b2 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Wed, 10 May 2017 09:07:17 -0400 Subject: [PATCH] NIFI-3860: Relax the constraint that the ProcessSession must always be given the most up-to-date version of a FlowFile This closes #1778. --- .../apache/nifi/processor/ProcessSession.java | 66 ++- .../apache/nifi/util/MockProcessSession.java | 285 +++++++------ .../nifi/util/TestMockProcessSession.java | 5 - .../repository/BatchingSessionFactory.java | 5 + .../repository/StandardProcessSession.java | 384 ++++++++++++------ .../TestStandardProcessSession.java | 125 ++++++ 6 files changed, 626 insertions(+), 244 deletions(-) diff --git a/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java b/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java index 80fa6c0b42..58f579f174 100644 --- a/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java +++ b/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java @@ -135,11 +135,18 @@ public interface ProcessSession { * {@link #read(FlowFile, InputStreamCallback)}, {@link #read(FlowFile, boolean, InputStreamCallback)} for any of * the given FlowFiles. *
  • No InputStream can be open for the content of any of the given FlowFiles (see {@link #read(FlowFile)}).
  • - *
  • Each of the FlowFiles provided must be the most up-to-date copy of the FlowFile.
  • + *
  • No OutputStream can be open for the content of any of the given FlowFiles (see {@link #write(FlowFile)}.
  • *
  • For any provided FlowFile, if the FlowFile has any child (e.g., by calling {@link #create(FlowFile)} and passing the FlowFile * as the argument), then all children that were created must also be in the Collection of provided FlowFiles.
  • * * + *

    + * Also note, that if any FlowFile given is not the most up-to-date version of that FlowFile, then the most up-to-date + * version of the FlowFile will be migrated to the new owner. For example, if a call to {@link #putAttribute(FlowFile, String, String)} is made, + * passing flowFile1 as the FlowFile, and then flowFile1 is passed to this method, then the newest version (including the + * newly added attribute) will be migrated, not the outdated version of the FlowFile that flowFile1 points to. + *

    + * * @param newOwner the ProcessSession that is to become the new owner of all FlowFiles * that currently belong to {@code this}. * @param flowFiles the FlowFiles to migrate @@ -528,8 +535,11 @@ public interface ProcessSession { * @param source flowfile to retrieve content of * @param reader that will be called to read the flowfile content * @throws IllegalStateException if detected that this method is being - * called from within a callback of another method in this session and for - * the given FlowFile(s) + * called from within a write callback of another method (i.e., from within the callback + * that is passed to {@link #write(FlowFile, OutputStreamCallback)} or {@link #write(FlowFile, StreamCallback)}) + * or has an OutputStream open (via a call to {@link #write(FlowFile)}) in this session and for + * the given FlowFile(s). Said another way, it is not permissible to call this method while writing to + * the same FlowFile. * @throws FlowFileHandlingException if the given FlowFile is already * transferred or removed or doesn't belong to this session. Automatic * rollback will occur. @@ -557,8 +567,11 @@ public interface ProcessSession { * @param flowFile the FlowFile to read * @return an InputStream that can be used to read the contents of the FlowFile * @throws IllegalStateException if detected that this method is being - * called from within a callback of another method in this session and for - * the given FlowFile(s) + * called from within a write callback of another method (i.e., from within the callback + * that is passed to {@link #write(FlowFile, OutputStreamCallback)} or {@link #write(FlowFile, StreamCallback)}) + * or has an OutputStream open (via a call to {@link #write(FlowFile)}) in this session and for + * the given FlowFile(s). Said another way, it is not permissible to call this method while writing to + * the same FlowFile. * @throws FlowFileHandlingException if the given FlowFile is already * transferred or removed or doesn't belong to this session. Automatic * rollback will occur. @@ -580,8 +593,11 @@ public interface ProcessSession { * @param allowSessionStreamManagement allow session to hold the stream open for performance reasons * @param reader that will be called to read the flowfile content * @throws IllegalStateException if detected that this method is being - * called from within a callback of another method in this session and for - * the given FlowFile(s) + * called from within a write callback of another method (i.e., from within the callback + * that is passed to {@link #write(FlowFile, OutputStreamCallback)} or {@link #write(FlowFile, StreamCallback)}) + * or has an OutputStream open (via a call to {@link #write(FlowFile)}) in this session and for + * the given FlowFile(s). Said another way, it is not permissible to call this method while writing to + * the same FlowFile. * @throws FlowFileHandlingException if the given FlowFile is already * transferred or removed or doesn't belong to this session. Automatic * rollback will occur. @@ -664,7 +680,8 @@ public interface ProcessSession { * @return updated FlowFile * @throws IllegalStateException if detected that this method is being * called from within a callback of another method in this session and for - * the given FlowFile(s) + * the given FlowFile(s), or if there is an open InputStream or OutputStream for the FlowFile's content + * (see {@link #read(FlowFile)} and {@link #write(FlowFile)}). * @throws FlowFileHandlingException if the given FlowFile is already * transferred or removed or doesn't belong to this session. Automatic * rollback will occur. @@ -679,6 +696,32 @@ public interface ProcessSession { */ FlowFile write(FlowFile source, OutputStreamCallback writer) throws FlowFileAccessException; + /** + * Provides an OutputStream that can be used to write to the contents of the + * given FlowFile. + * + * @param source to write to + * + * @return an OutputStream that can be used to write to the contents of the FlowFile + * + * @throws IllegalStateException if detected that this method is being + * called from within a callback of another method in this session and for + * the given FlowFile(s), or if there is an open InputStream or OutputStream for the FlowFile's content + * (see {@link #read(FlowFile)}). + * @throws FlowFileHandlingException if the given FlowFile is already + * transferred or removed or doesn't belong to this session. Automatic + * rollback will occur. + * @throws MissingFlowFileException if the given FlowFile content cannot be + * found. The FlowFile should no longer be referenced, will be internally + * destroyed, and the session is automatically rolled back and what is left + * of the FlowFile is destroyed. + * @throws FlowFileAccessException if some IO problem occurs accessing + * FlowFile content; if an attempt is made to access the OutputStream + * provided to the given OutputStreamCallaback after this method completed + * its execution + */ + OutputStream write(FlowFile source); + /** * Executes the given callback against the content corresponding to the * given flow file. @@ -692,7 +735,8 @@ public interface ProcessSession { * @return updated FlowFile * @throws IllegalStateException if detected that this method is being * called from within a callback of another method in this session and for - * the given FlowFile(s) + * the given FlowFile(s), or if there is an open InputStream or OutputStream for the FlowFile's content + * (see {@link #read(FlowFile)} and {@link #write(FlowFile)}). * @throws FlowFileHandlingException if the given FlowFile is already * transferred or removed or doesn't belong to this session. Automatic * rollback will occur. @@ -721,6 +765,10 @@ public interface ProcessSession { * @throws FlowFileAccessException if an attempt is made to access the * OutputStream provided to the given OutputStreamCallaback after this * method completed its execution + * @throws IllegalStateException if detected that this method is being + * called from within a callback of another method in this session and for + * the given FlowFile(s), or if there is an open InputStream or OutputStream for the FlowFile's content + * (see {@link #read(FlowFile)} and {@link #write(FlowFile)}). */ FlowFile append(FlowFile source, OutputStreamCallback writer) throws FlowFileAccessException; diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java index f05b9b35ba..af472d6448 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java @@ -18,6 +18,7 @@ package org.apache.nifi.util; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; +import java.io.Closeable; import java.io.File; import java.io.IOException; import java.io.InputStream; @@ -70,12 +71,15 @@ public class MockProcessSession implements ProcessSession { private final Map originalVersions = new HashMap<>(); private final SharedSessionState sharedState; private final Map counterMap = new HashMap<>(); - private final Set recursionSet = new HashSet<>(); + private final Map readRecursionSet = new HashMap<>(); + private final Set writeRecursionSet = new HashSet<>(); private final MockProvenanceReporter provenanceReporter; - private final boolean enforceReadStreamsClosed; + private final boolean enforceStreamsClosed; // A List of InputStreams that have been created by calls to {@link #read(FlowFile)} and have not yet been closed. private final Map openInputStreams = new HashMap<>(); + // A List of OutputStreams that have been created by calls to {@link #write(FlowFile)} and have not yet been closed. + private final Map openOutputStreams = new HashMap<>(); private boolean committed = false; private boolean rolledback = false; @@ -87,9 +91,9 @@ public class MockProcessSession implements ProcessSession { this(sharedState, processor, true); } - public MockProcessSession(final SharedSessionState sharedState, final Processor processor, final boolean enforceReadStreamsClosed) { + public MockProcessSession(final SharedSessionState sharedState, final Processor processor, final boolean enforceStreamsClosed) { this.processor = processor; - this.enforceReadStreamsClosed = enforceReadStreamsClosed; + this.enforceStreamsClosed = enforceStreamsClosed; this.sharedState = sharedState; this.processorQueue = sharedState.getFlowFileQueue(); provenanceReporter = new MockProvenanceReporter(this, sharedState, processor.getIdentifier(), processor.getClass().getSimpleName()); @@ -134,14 +138,26 @@ public class MockProcessSession implements ProcessSession { for (final FlowFile flowFile : flowFiles) { if (openInputStreams.containsKey(flowFile)) { throw new IllegalStateException(flowFile + " cannot be migrated to a new Process Session because this session currently " - + "has an open Input Stream for the FlowFile, created by calling ProcessSession.read(FlowFile)"); + + "has an open InputStream for the FlowFile, created by calling ProcessSession.read(FlowFile)"); } - if (recursionSet.contains(flowFile)) { + if (openOutputStreams.containsKey(flowFile)) { + throw new IllegalStateException(flowFile + " cannot be migrated to a new Process Session because this session currently " + + "has an open OutputStream for the FlowFile, created by calling ProcessSession.write(FlowFile)"); + } + + if (readRecursionSet.containsKey(flowFile)) { throw new IllegalStateException(flowFile + " already in use for an active callback or InputStream created by ProcessSession.read(FlowFile) has not been closed"); } - ensureCurrentVersion(flowFile); + if (writeRecursionSet.contains(flowFile)) { + throw new IllegalStateException(flowFile + " already in use for an active callback or OutputStream created by ProcessSession.write(FlowFile) has not been closed"); + } + + final FlowFile currentVersion = currentVersions.get(flowFile.getId()); + if (currentVersion == null) { + throw new FlowFileHandlingException(flowFile + " is not known in this session"); + } } for (final Map.Entry> entry : transferMap.entrySet()) { @@ -185,8 +201,8 @@ public class MockProcessSession implements ProcessSession { } @Override - public MockFlowFile clone(final FlowFile flowFile) { - validateState(flowFile); + public MockFlowFile clone(FlowFile flowFile) { + flowFile = validateState(flowFile); final MockFlowFile newFlowFile = new MockFlowFile(sharedState.nextFlowFileId(), flowFile); currentVersions.put(newFlowFile.getId(), newFlowFile); beingProcessed.add(newFlowFile.getId()); @@ -194,8 +210,8 @@ public class MockProcessSession implements ProcessSession { } @Override - public MockFlowFile clone(final FlowFile flowFile, final long offset, final long size) { - validateState(flowFile); + public MockFlowFile clone(FlowFile flowFile, final long offset, final long size) { + flowFile = validateState(flowFile); if (offset + size > flowFile.getSize()) { throw new FlowFileHandlingException("Specified offset of " + offset + " and size " + size + " exceeds size of " + flowFile.toString()); } @@ -209,26 +225,34 @@ public class MockProcessSession implements ProcessSession { return newFlowFile; } + private void closeStreams(final Map streamMap, final boolean enforceClosed) { + final Map openStreamCopy = new HashMap<>(streamMap); // avoid ConcurrentModificationException by creating a copy of the List + for (final Map.Entry entry : openStreamCopy.entrySet()) { + final FlowFile flowFile = entry.getKey(); + final Closeable openStream = entry.getValue(); + + try { + openStream.close(); + } catch (IOException e) { + throw new FlowFileAccessException("Failed to close stream for " + flowFile, e); + } + + if (enforceClosed) { + throw new FlowFileHandlingException("Cannot commit session because the following streams were created via " + + "calls to ProcessSession.read(FlowFile) or ProcessSession.write(FlowFile) and never closed: " + streamMap); + } + } + } + + @Override public void commit() { if (!beingProcessed.isEmpty()) { throw new FlowFileHandlingException("Cannot commit session because the following FlowFiles have not been removed or transferred: " + beingProcessed); } - if (!openInputStreams.isEmpty()) { - final List openStreamCopy = new ArrayList<>(openInputStreams.values()); // avoid ConcurrentModificationException by creating a copy of the List - for (final InputStream openInputStream : openStreamCopy) { - try { - openInputStream.close(); - } catch (final IOException e) { - } - } - - if (enforceReadStreamsClosed) { - throw new FlowFileHandlingException("Cannot commit session because the following Input Streams were created via " - + "calls to ProcessSession.read(FlowFile) and never closed: " + openStreamCopy); - } - } + closeStreams(openInputStreams, enforceStreamsClosed); + closeStreams(openOutputStreams, enforceStreamsClosed); committed = true; beingProcessed.clear(); @@ -288,8 +312,8 @@ public class MockProcessSession implements ProcessSession { } @Override - public void exportTo(final FlowFile flowFile, final OutputStream out) { - validateState(flowFile); + public void exportTo(FlowFile flowFile, final OutputStream out) { + flowFile = validateState(flowFile); if (flowFile == null || out == null) { throw new IllegalArgumentException("arguments cannot be null"); } @@ -308,8 +332,8 @@ public class MockProcessSession implements ProcessSession { } @Override - public void exportTo(final FlowFile flowFile, final Path path, final boolean append) { - validateState(flowFile); + public void exportTo(FlowFile flowFile, final Path path, final boolean append) { + flowFile = validateState(flowFile); if (flowFile == null || path == null) { throw new IllegalArgumentException("argument cannot be null"); } @@ -391,8 +415,8 @@ public class MockProcessSession implements ProcessSession { } @Override - public MockFlowFile importFrom(final InputStream in, final FlowFile flowFile) { - validateState(flowFile); + public MockFlowFile importFrom(final InputStream in, FlowFile flowFile) { + flowFile = validateState(flowFile); if (in == null || flowFile == null) { throw new IllegalArgumentException("argument cannot be null"); } @@ -413,8 +437,8 @@ public class MockProcessSession implements ProcessSession { } @Override - public MockFlowFile importFrom(final Path path, final boolean keepSourceFile, final FlowFile flowFile) { - validateState(flowFile); + public MockFlowFile importFrom(final Path path, final boolean keepSourceFile, FlowFile flowFile) { + flowFile = validateState(flowFile); if (path == null || flowFile == null) { throw new IllegalArgumentException("argument cannot be null"); } @@ -438,11 +462,9 @@ public class MockProcessSession implements ProcessSession { } @Override - public MockFlowFile merge(final Collection sources, final FlowFile destination) { - for (final FlowFile source : sources) { - validateState(source); - } - validateState(destination); + public MockFlowFile merge(Collection sources, FlowFile destination) { + sources = validateState(sources); + destination = validateState(destination); final ByteArrayOutputStream baos = new ByteArrayOutputStream(); for (final FlowFile flowFile : sources) { final MockFlowFile mock = (MockFlowFile) flowFile; @@ -462,8 +484,8 @@ public class MockProcessSession implements ProcessSession { } @Override - public MockFlowFile putAllAttributes(final FlowFile flowFile, final Map attrs) { - validateState(flowFile); + public MockFlowFile putAllAttributes(FlowFile flowFile, final Map attrs) { + flowFile = validateState(flowFile); if (attrs == null || flowFile == null) { throw new IllegalArgumentException("argument cannot be null"); } @@ -479,8 +501,8 @@ public class MockProcessSession implements ProcessSession { } @Override - public MockFlowFile putAttribute(final FlowFile flowFile, final String attrName, final String attrValue) { - validateState(flowFile); + public MockFlowFile putAttribute(FlowFile flowFile, final String attrName, final String attrValue) { + flowFile = validateState(flowFile); if (attrName == null || attrValue == null || flowFile == null) { throw new IllegalArgumentException("argument cannot be null"); } @@ -508,19 +530,19 @@ public class MockProcessSession implements ProcessSession { } @Override - public void read(final FlowFile flowFile, boolean allowSessionStreamManagement, final InputStreamCallback callback) { + public void read(FlowFile flowFile, boolean allowSessionStreamManagement, final InputStreamCallback callback) { if (callback == null || flowFile == null) { throw new IllegalArgumentException("argument cannot be null"); } - validateState(flowFile); + flowFile = validateState(flowFile); if (!(flowFile instanceof MockFlowFile)) { throw new IllegalArgumentException("Cannot export a flow file that I did not create"); } final MockFlowFile mock = (MockFlowFile) flowFile; final ByteArrayInputStream bais = new ByteArrayInputStream(mock.getData()); - recursionSet.add(flowFile); + incrementReadCount(flowFile); try { callback.process(bais); if(!allowSessionStreamManagement){ @@ -529,21 +551,35 @@ public class MockProcessSession implements ProcessSession { } catch (final IOException e) { throw new ProcessException(e.toString(), e); } finally { - recursionSet.remove(flowFile); + decrementReadCount(flowFile); + } + } + + private void incrementReadCount(final FlowFile flowFile) { + readRecursionSet.compute(flowFile, (ff, count) -> count == null ? 1 : count + 1); + } + + private void decrementReadCount(final FlowFile flowFile) { + final Integer count = readRecursionSet.get(flowFile); + if (count == null) { + return; + } + + final int updatedCount = count - 1; + if (updatedCount == 0) { + readRecursionSet.remove(flowFile); + } else { + readRecursionSet.put(flowFile, updatedCount); } } @Override - public InputStream read(final FlowFile flowFile) { + public InputStream read(FlowFile flowFile) { if (flowFile == null) { throw new IllegalArgumentException("FlowFile cannot be null"); } - validateState(flowFile); - if (!(flowFile instanceof MockFlowFile)) { - throw new IllegalArgumentException("Cannot export a flow file that I did not create"); - } - final MockFlowFile mock = (MockFlowFile) flowFile; + final MockFlowFile mock = validateState(flowFile); final ByteArrayInputStream bais = new ByteArrayInputStream(mock.getData()); final InputStream errorHandlingStream = new InputStream() { @@ -559,23 +595,23 @@ public class MockProcessSession implements ProcessSession { @Override public void close() throws IOException { - openInputStreams.remove(flowFile); + openInputStreams.remove(mock); bais.close(); } @Override public String toString() { - return "ErrorHandlingInputStream[flowFile=" + flowFile + "]"; + return "ErrorHandlingInputStream[flowFile=" + mock + "]"; } }; - openInputStreams.put(flowFile, errorHandlingStream); + openInputStreams.put(mock, errorHandlingStream); return errorHandlingStream; } @Override - public void remove(final FlowFile flowFile) { - validateState(flowFile); + public void remove(FlowFile flowFile) { + flowFile = validateState(flowFile); final Iterator penalizedItr = penalized.iterator(); while (penalizedItr.hasNext()) { @@ -603,10 +639,8 @@ public class MockProcessSession implements ProcessSession { } @Override - public void remove(final Collection flowFiles) { - for (final FlowFile flowFile : flowFiles) { - validateState(flowFile); - } + public void remove(Collection flowFiles) { + flowFiles = validateState(flowFiles); for (final FlowFile flowFile : flowFiles) { remove(flowFile); @@ -614,8 +648,8 @@ public class MockProcessSession implements ProcessSession { } @Override - public MockFlowFile removeAllAttributes(final FlowFile flowFile, final Set attrNames) { - validateState(flowFile); + public MockFlowFile removeAllAttributes(FlowFile flowFile, final Set attrNames) { + flowFile = validateState(flowFile); if (attrNames == null || flowFile == null) { throw new IllegalArgumentException("argument cannot be null"); } @@ -632,8 +666,8 @@ public class MockProcessSession implements ProcessSession { } @Override - public MockFlowFile removeAllAttributes(final FlowFile flowFile, final Pattern keyPattern) { - validateState(flowFile); + public MockFlowFile removeAllAttributes(FlowFile flowFile, final Pattern keyPattern) { + flowFile = validateState(flowFile); if (flowFile == null) { throw new IllegalArgumentException("flowFile cannot be null"); } @@ -652,8 +686,8 @@ public class MockProcessSession implements ProcessSession { } @Override - public MockFlowFile removeAttribute(final FlowFile flowFile, final String attrName) { - validateState(flowFile); + public MockFlowFile removeAttribute(FlowFile flowFile, final String attrName) { + flowFile = validateState(flowFile); if (attrName == null || flowFile == null) { throw new IllegalArgumentException("argument cannot be null"); } @@ -681,13 +715,9 @@ public class MockProcessSession implements ProcessSession { if(committed){ return; } - final List openStreamCopy = new ArrayList<>(openInputStreams.values()); // avoid ConcurrentModificationException by creating a copy of the List - for (final InputStream openInputStream : openStreamCopy) { - try { - openInputStream.close(); - } catch (final IOException e) { - } - } + + closeStreams(openInputStreams, false); + closeStreams(openOutputStreams, false); for (final List list : transferMap.values()) { for (final MockFlowFile flowFile : list) { @@ -720,8 +750,8 @@ public class MockProcessSession implements ProcessSession { } @Override - public void transfer(final FlowFile flowFile) { - validateState(flowFile); + public void transfer(FlowFile flowFile) { + flowFile = validateState(flowFile); if (!(flowFile instanceof MockFlowFile)) { throw new IllegalArgumentException("I only accept MockFlowFile"); } @@ -748,7 +778,7 @@ public class MockProcessSession implements ProcessSession { } @Override - public void transfer(final FlowFile flowFile, final Relationship relationship) { + public void transfer(FlowFile flowFile, final Relationship relationship) { if (relationship == Relationship.SELF) { transfer(flowFile); return; @@ -757,7 +787,7 @@ public class MockProcessSession implements ProcessSession { throw new IllegalArgumentException("this relationship " + relationship.getName() + " is not known"); } - validateState(flowFile); + flowFile = validateState(flowFile); List list = transferMap.computeIfAbsent(relationship, r -> new ArrayList<>()); beingProcessed.remove(flowFile.getId()); @@ -766,7 +796,7 @@ public class MockProcessSession implements ProcessSession { } @Override - public void transfer(final Collection flowFiles, final Relationship relationship) { + public void transfer(Collection flowFiles, final Relationship relationship) { if (relationship == Relationship.SELF) { transfer(flowFiles); return; @@ -777,8 +807,8 @@ public class MockProcessSession implements ProcessSession { } @Override - public MockFlowFile write(final FlowFile flowFile, final OutputStreamCallback callback) { - validateState(flowFile); + public MockFlowFile write(FlowFile flowFile, final OutputStreamCallback callback) { + flowFile = validateState(flowFile); if (callback == null || flowFile == null) { throw new IllegalArgumentException("argument cannot be null"); } @@ -788,13 +818,13 @@ public class MockProcessSession implements ProcessSession { final MockFlowFile mock = (MockFlowFile) flowFile; final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - recursionSet.add(flowFile); + writeRecursionSet.add(flowFile); try { callback.process(baos); } catch (final IOException e) { throw new ProcessException(e.toString(), e); } finally { - recursionSet.remove(flowFile); + writeRecursionSet.remove(flowFile); } final MockFlowFile newFlowFile = new MockFlowFile(mock.getId(), flowFile); @@ -805,15 +835,35 @@ public class MockProcessSession implements ProcessSession { } @Override - public FlowFile append(final FlowFile flowFile, final OutputStreamCallback callback) { - validateState(flowFile); - if (callback == null || flowFile == null) { - throw new IllegalArgumentException("argument cannot be null"); - } + public OutputStream write(FlowFile flowFile) { if (!(flowFile instanceof MockFlowFile)) { throw new IllegalArgumentException("Cannot export a flow file that I did not create"); } - final MockFlowFile mock = (MockFlowFile) flowFile; + + final MockFlowFile mockFlowFile = validateState(flowFile); + writeRecursionSet.add(flowFile); + final ByteArrayOutputStream baos = new ByteArrayOutputStream() { + @Override + public void close() throws IOException { + super.close(); + + writeRecursionSet.remove(mockFlowFile); + final MockFlowFile newFlowFile = new MockFlowFile(mockFlowFile.getId(), flowFile); + currentVersions.put(newFlowFile.getId(), newFlowFile); + + newFlowFile.setData(toByteArray()); + } + }; + + return baos; + } + + @Override + public FlowFile append(FlowFile flowFile, final OutputStreamCallback callback) { + if (callback == null || flowFile == null) { + throw new IllegalArgumentException("argument cannot be null"); + } + final MockFlowFile mock = validateState(flowFile); final ByteArrayOutputStream baos = new ByteArrayOutputStream(); try { @@ -832,25 +882,21 @@ public class MockProcessSession implements ProcessSession { @Override public MockFlowFile write(final FlowFile flowFile, final StreamCallback callback) { - validateState(flowFile); if (callback == null || flowFile == null) { throw new IllegalArgumentException("argument cannot be null"); } - if (!(flowFile instanceof MockFlowFile)) { - throw new IllegalArgumentException("Cannot export a flow file that I did not create"); - } - final MockFlowFile mock = (MockFlowFile) flowFile; + final MockFlowFile mock = validateState(flowFile); final ByteArrayInputStream in = new ByteArrayInputStream(mock.getData()); final ByteArrayOutputStream out = new ByteArrayOutputStream(); - recursionSet.add(flowFile); + writeRecursionSet.add(flowFile); try { callback.process(in, out); } catch (final IOException e) { throw new ProcessException(e.toString(), e); } finally { - recursionSet.remove(flowFile); + writeRecursionSet.remove(flowFile); } final MockFlowFile newFlowFile = new MockFlowFile(mock.getId(), flowFile); @@ -912,10 +958,8 @@ public class MockProcessSession implements ProcessSession { @Override public MockFlowFile merge(Collection sources, FlowFile destination, byte[] header, byte[] footer, byte[] demarcator) { - for (final FlowFile flowFile : sources) { - validateState(flowFile); - } - validateState(destination); + sources = validateState(sources); + destination = validateState(destination); final ByteArrayOutputStream baos = new ByteArrayOutputStream(); try { @@ -945,30 +989,37 @@ public class MockProcessSession implements ProcessSession { return newFlowFile; } - private void validateState(final FlowFile flowFile) { + private List validateState(final Collection flowFiles) { + return flowFiles.stream() + .map(ff -> validateState(ff)) + .collect(Collectors.toList()); + } + + private MockFlowFile validateState(final FlowFile flowFile) { Objects.requireNonNull(flowFile); - ensureCurrentVersion(flowFile); - if (recursionSet.contains(flowFile)) { + + final MockFlowFile currentVersion = currentVersions.get(flowFile.getId()); + if (currentVersion == null) { + throw new FlowFileHandlingException(flowFile + " is not known in this session"); + } + + if (readRecursionSet.containsKey(flowFile)) { throw new IllegalStateException(flowFile + " already in use for an active callback or InputStream created by ProcessSession.read(FlowFile) has not been closed"); } + if (writeRecursionSet.contains(flowFile)) { + throw new IllegalStateException(flowFile + " already in use for an active callback or OutputStream created by ProcessSession.write(FlowFile) has not been closed"); + } + for (final List flowFiles : transferMap.values()) { if (flowFiles.contains(flowFile)) { throw new IllegalStateException(flowFile + " has already been transferred"); } } + + return currentVersion; } - private void ensureCurrentVersion(final FlowFile flowFile) { - final FlowFile currentVersion = currentVersions.get(flowFile.getId()); - if (currentVersion == null) { - throw new FlowFileHandlingException(flowFile + " is not known in this session"); - } - - if (currentVersion != flowFile) { - throw new FlowFileHandlingException(flowFile + " is not the most recent version of this flow file within this session"); - } - } /** * Inherits the attributes from the given source flow file into another flow @@ -1235,8 +1286,8 @@ public class MockProcessSession implements ProcessSession { } @Override - public MockFlowFile penalize(final FlowFile flowFile) { - validateState(flowFile); + public MockFlowFile penalize(FlowFile flowFile) { + flowFile = validateState(flowFile); final MockFlowFile mockFlowFile = (MockFlowFile) flowFile; final MockFlowFile newFlowFile = new MockFlowFile(mockFlowFile.getId(), flowFile); currentVersions.put(newFlowFile.getId(), newFlowFile); @@ -1245,8 +1296,8 @@ public class MockProcessSession implements ProcessSession { return newFlowFile; } - public byte[] getContentAsByteArray(final MockFlowFile flowFile) { - validateState(flowFile); + public byte[] getContentAsByteArray(MockFlowFile flowFile) { + flowFile = validateState(flowFile); return flowFile.getData(); } diff --git a/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java b/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java index d1c2bea7e9..1c554d2007 100644 --- a/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java +++ b/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java @@ -38,11 +38,6 @@ import org.junit.Test; public class TestMockProcessSession { - @Test(expected = AssertionError.class) - public void testPenalizeFlowFileFromProcessor() { - TestRunners.newTestRunner(PoorlyBehavedProcessor.class).run(); - } - @Test public void testReadWithoutCloseThrowsExceptionOnCommit() throws IOException { final Processor processor = new PoorlyBehavedProcessor(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/BatchingSessionFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/BatchingSessionFactory.java index 90c9dc7918..73b6c5bd80 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/BatchingSessionFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/BatchingSessionFactory.java @@ -251,5 +251,10 @@ public class BatchingSessionFactory implements ProcessSessionFactory { public ProvenanceReporter getProvenanceReporter() { return session.getProvenanceReporter(); } + + @Override + public OutputStream write(final FlowFile source) { + return session.write(source); + } } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index 512461f8a5..b3079303f0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -18,6 +18,7 @@ package org.apache.nifi.controller.repository; import java.io.BufferedOutputStream; import java.io.ByteArrayInputStream; +import java.io.Closeable; import java.io.EOFException; import java.io.IOException; import java.io.InputStream; @@ -110,7 +111,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE private final Map counters = new HashMap<>(); private final Map appendableStreams = new HashMap<>(); private final ProcessContext context; - private final Set recursionSet = new HashSet<>();// set used to track what is currently being operated on to prevent logic failures if recursive calls occurring + private final Map readRecursionSet = new HashMap<>();// set used to track what is currently being operated on to prevent logic failures if recursive calls occurring + private final Set writeRecursionSet = new HashSet<>(); private final Map deleteOnCommit = new HashMap<>(); private final long sessionId; private final String connectableDescription; @@ -133,6 +135,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE // List of InputStreams that have been opened by calls to {@link #read(FlowFile)} and not yet closed private final Map openInputStreams = new HashMap<>(); + // List of OutputStreams that have been opened by calls to {@link #write(FlowFile)} and not yet closed + private final Map openOutputStreams = new HashMap<>(); // maps a FlowFile to all Provenance Events that were generated for that FlowFile. // we do this so that if we generate a Fork event, for example, and then remove the event in the same @@ -187,14 +191,11 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE processingStartTime = System.nanoTime(); } - public void checkpoint() { - - resetWriteClaims(false); - - final Map openStreamCopy = new HashMap<>(openInputStreams); // avoid ConcurrentModificationException by creating a copy of the List - for (final Map.Entry entry : openStreamCopy.entrySet()) { + private void closeStreams(final Map streamMap) { + final Map openStreamCopy = new HashMap<>(streamMap); // avoid ConcurrentModificationException by creating a copy of the List + for (final Map.Entry entry : openStreamCopy.entrySet()) { final FlowFile flowFile = entry.getKey(); - final InputStream openStream = entry.getValue(); + final Closeable openStream = entry.getValue(); LOG.warn("{} closing {} for {} because the session was committed without the stream being closed.", this, openStream, flowFile); @@ -205,8 +206,19 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE LOG.warn("", e); } } + } - if (!recursionSet.isEmpty()) { + public void checkpoint() { + + resetWriteClaims(false); + + closeStreams(openInputStreams); + closeStreams(openOutputStreams); + + if (!readRecursionSet.isEmpty()) { + throw new IllegalStateException(); + } + if (!writeRecursionSet.isEmpty()) { throw new IllegalStateException(); } @@ -902,19 +914,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE deleteOnCommit.clear(); - final Map openStreamCopy = new HashMap<>(openInputStreams); // avoid ConcurrentModificationException by creating a copy of the List - for (final Map.Entry entry : openStreamCopy.entrySet()) { - final FlowFile flowFile = entry.getKey(); - final InputStream openStream = entry.getValue(); - - LOG.debug("{} closing {} for {} due to session rollback", this, openStream, flowFile); - try { - openStream.close(); - } catch (final Exception e) { - LOG.warn("{} Attempted to close {} for {} due to session rollback but close failed", this, openStream, this.connectableDescription); - LOG.warn("", e); - } - } + closeStreams(openInputStreams); + closeStreams(openOutputStreams); try { claimCache.reset(); @@ -1092,7 +1093,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE private void resetState() { records.clear(); - recursionSet.clear(); + readRecursionSet.clear(); + writeRecursionSet.clear(); contentSizeIn = 0L; contentSizeOut = 0L; flowFilesIn = 0; @@ -1142,12 +1144,21 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE for (final FlowFile flowFile : flowFiles) { if (openInputStreams.containsKey(flowFile)) { throw new IllegalStateException(flowFile + " cannot be migrated to a new Process Session because this session currently " - + "has an open Input Stream for the FlowFile, created by calling ProcessSession.read(FlowFile)"); + + "has an open InputStream for the FlowFile, created by calling ProcessSession.read(FlowFile)"); } - if (recursionSet.contains(flowFile)) { + if (openOutputStreams.containsKey(flowFile)) { + throw new IllegalStateException(flowFile + " cannot be migrated to a new Process Session because this session currently " + + "has an open OutputStream for the FlowFile, created by calling ProcessSession.write(FlowFile)"); + } + + if (readRecursionSet.containsKey(flowFile)) { throw new IllegalStateException(flowFile + " already in use for an active callback or InputStream created by ProcessSession.read(FlowFile) has not been closed"); } + if (writeRecursionSet.contains(flowFile)) { + throw new IllegalStateException(flowFile + " already in use for an active callback or OutputStream created by ProcessSession.write(FlowFile) has not been closed"); + } + final StandardRepositoryRecord record = records.get(flowFile); if (record == null) { throw new FlowFileHandlingException(flowFile + " is not known in this session (" + toString() + ")"); @@ -1614,8 +1625,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } @Override - public FlowFile clone(final FlowFile example, final long offset, final long size) { - validateRecordState(example); + public FlowFile clone(FlowFile example, final long offset, final long size) { + example = validateRecordState(example); final StandardRepositoryRecord exampleRepoRecord = records.get(example); final FlowFileRecord currRec = exampleRepoRecord.getCurrent(); final ContentClaim claim = exampleRepoRecord.getCurrentClaim(); @@ -1683,8 +1694,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } @Override - public FlowFile penalize(final FlowFile flowFile) { - validateRecordState(flowFile); + public FlowFile penalize(FlowFile flowFile) { + flowFile = validateRecordState(flowFile); final StandardRepositoryRecord record = records.get(flowFile); final long expirationEpochMillis = System.currentTimeMillis() + context.getConnectable().getPenalizationPeriod(TimeUnit.MILLISECONDS); final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).penaltyExpirationTime(expirationEpochMillis).build(); @@ -1693,8 +1704,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } @Override - public FlowFile putAttribute(final FlowFile flowFile, final String key, final String value) { - validateRecordState(flowFile); + public FlowFile putAttribute(FlowFile flowFile, final String key, final String value) { + flowFile = validateRecordState(flowFile); if (CoreAttributes.UUID.key().equals(key)) { return flowFile; @@ -1708,8 +1719,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } @Override - public FlowFile putAllAttributes(final FlowFile flowFile, final Map attributes) { - validateRecordState(flowFile); + public FlowFile putAllAttributes(FlowFile flowFile, final Map attributes) { + flowFile = validateRecordState(flowFile); final StandardRepositoryRecord record = records.get(flowFile); final Map updatedAttributes; @@ -1728,8 +1739,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } @Override - public FlowFile removeAttribute(final FlowFile flowFile, final String key) { - validateRecordState(flowFile); + public FlowFile removeAttribute(FlowFile flowFile, final String key) { + flowFile = validateRecordState(flowFile); if (CoreAttributes.UUID.key().equals(key)) { return flowFile; @@ -1742,8 +1753,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } @Override - public FlowFile removeAllAttributes(final FlowFile flowFile, final Set keys) { - validateRecordState(flowFile); + public FlowFile removeAllAttributes(FlowFile flowFile, final Set keys) { + flowFile = validateRecordState(flowFile); if (keys == null) { return flowFile; @@ -1766,8 +1777,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } @Override - public FlowFile removeAllAttributes(final FlowFile flowFile, final Pattern keyPattern) { - validateRecordState(flowFile); + public FlowFile removeAllAttributes(FlowFile flowFile, final Pattern keyPattern) { + flowFile = validateRecordState(flowFile); final StandardRepositoryRecord record = records.get(flowFile); final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).removeAttributes(keyPattern).build(); @@ -1800,8 +1811,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } @Override - public void transfer(final FlowFile flowFile, final Relationship relationship) { - validateRecordState(flowFile); + public void transfer(FlowFile flowFile, final Relationship relationship) { + flowFile = validateRecordState(flowFile); final int numDestinations = context.getConnections(relationship).size(); final int multiplier = Math.max(1, numDestinations); @@ -1830,8 +1841,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } @Override - public void transfer(final FlowFile flowFile) { - validateRecordState(flowFile); + public void transfer(FlowFile flowFile) { + flowFile = validateRecordState(flowFile); final StandardRepositoryRecord record = records.get(flowFile); if (record.getOriginalQueue() == null) { throw new IllegalArgumentException("Cannot transfer FlowFiles that are created in this Session back to self"); @@ -1848,8 +1859,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } @Override - public void transfer(final Collection flowFiles, final Relationship relationship) { - validateRecordState(flowFiles); + public void transfer(Collection flowFiles, final Relationship relationship) { + flowFiles = validateRecordState(flowFiles); boolean autoTerminated = false; boolean selfRelationship = false; @@ -1885,8 +1896,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } @Override - public void remove(final FlowFile flowFile) { - validateRecordState(flowFile); + public void remove(FlowFile flowFile) { + flowFile = validateRecordState(flowFile); final StandardRepositoryRecord record = records.get(flowFile); record.markForDelete(); removedFlowFiles.add(flowFile.getAttribute(CoreAttributes.UUID.key())); @@ -1906,8 +1917,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } @Override - public void remove(final Collection flowFiles) { - validateRecordState(flowFiles); + public void remove(Collection flowFiles) { + flowFiles = validateRecordState(flowFiles); for (final FlowFile flowFile : flowFiles) { final StandardRepositoryRecord record = records.get(flowFile); record.markForDelete(); @@ -2044,7 +2055,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE // If the recursion set is empty, we can use the same input stream that we already have open. However, if // the recursion set is NOT empty, we can't do this because we may be reading the input of FlowFile 1 while in the // callback for reading FlowFile 1 and if we used the same stream we'd be destroying the ability to read from FlowFile 1. - if (allowCachingOfStream && recursionSet.isEmpty()) { + if (allowCachingOfStream && readRecursionSet.isEmpty() && writeRecursionSet.isEmpty()) { if (currentReadClaim == claim) { if (currentReadClaimStream != null && currentReadClaimStream.getBytesConsumed() <= offset) { final long bytesToSkip = offset - currentReadClaimStream.getBytesConsumed(); @@ -2097,7 +2108,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE @Override public void read(FlowFile source, boolean allowSessionStreamManagement, InputStreamCallback reader) { - validateRecordState(source); + source = validateRecordState(source, true); final StandardRepositoryRecord record = records.get(source); try { @@ -2121,7 +2132,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE boolean cnfeThrown = false; try { - recursionSet.add(source); + incrementReadCount(source); reader.process(ffais); // Allow processors to close the file after reading to avoid too many files open or do smart session stream management. @@ -2133,7 +2144,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE cnfeThrown = true; throw cnfe; } finally { - recursionSet.remove(source); + decrementReadCount(source); bytesRead += countingStream.getBytesRead(); // if cnfeThrown is true, we don't need to re-thrown the Exception; it will propagate. @@ -2150,8 +2161,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } @Override - public InputStream read(final FlowFile source) { - validateRecordState(source); + public InputStream read(FlowFile source) { + source = validateRecordState(source, true); final StandardRepositoryRecord record = records.get(source); try { @@ -2165,6 +2176,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE final ByteCountingInputStream countingStream = new ByteCountingInputStream(limitedIn); final FlowFileAccessInputStream ffais = new FlowFileAccessInputStream(countingStream, source, record.getCurrentClaim()); + final FlowFile sourceFlowFile = source; final InputStream errorHandlingStream = new InputStream() { private boolean closed = false; @@ -2177,7 +2189,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE close(); throw cnfe; } catch (final FlowFileAccessException ffae) { - LOG.error("Failed to read content from " + source + "; rolling back session", ffae); + LOG.error("Failed to read content from " + sourceFlowFile + "; rolling back session", ffae); rollback(true); close(); throw ffae; @@ -2198,7 +2210,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE close(); throw cnfe; } catch (final FlowFileAccessException ffae) { - LOG.error("Failed to read content from " + source + "; rolling back session", ffae); + LOG.error("Failed to read content from " + sourceFlowFile + "; rolling back session", ffae); rollback(true); close(); throw ffae; @@ -2207,13 +2219,15 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE @Override public void close() throws IOException { + decrementReadCount(sourceFlowFile); + if (!closed) { StandardProcessSession.this.bytesRead += countingStream.getBytesRead(); closed = true; } ffais.close(); - openInputStreams.remove(source); + openInputStreams.remove(sourceFlowFile); } @Override @@ -2243,23 +2257,42 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE @Override public String toString() { - return "ErrorHandlingInputStream[FlowFile=" + source + "]"; + return "ErrorHandlingInputStream[FlowFile=" + sourceFlowFile + "]"; } }; - openInputStreams.put(source, errorHandlingStream); + incrementReadCount(sourceFlowFile); + openInputStreams.put(sourceFlowFile, errorHandlingStream); return errorHandlingStream; } + private void incrementReadCount(final FlowFile flowFile) { + readRecursionSet.compute(flowFile, (ff, count) -> count == null ? 1 : count + 1); + } + + private void decrementReadCount(final FlowFile flowFile) { + final Integer count = readRecursionSet.get(flowFile); + if (count == null) { + return; + } + + final int updatedCount = count - 1; + if (updatedCount == 0) { + readRecursionSet.remove(flowFile); + } else { + readRecursionSet.put(flowFile, updatedCount); + } + } + @Override public FlowFile merge(final Collection sources, final FlowFile destination) { return merge(sources, destination, null, null, null); } @Override - public FlowFile merge(final Collection sources, final FlowFile destination, final byte[] header, final byte[] footer, final byte[] demarcator) { - validateRecordState(sources); - validateRecordState(destination); + public FlowFile merge(Collection sources, FlowFile destination, final byte[] header, final byte[] footer, final byte[] demarcator) { + sources = validateRecordState(sources); + destination = validateRecordState(destination); if (sources.contains(destination)) { throw new IllegalArgumentException("Destination cannot be within sources"); } @@ -2358,8 +2391,125 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } @Override - public FlowFile write(final FlowFile source, final OutputStreamCallback writer) { - validateRecordState(source); + public OutputStream write(FlowFile source) { + source = validateRecordState(source); + final StandardRepositoryRecord record = records.get(source); + + ContentClaim newClaim = null; + try { + newClaim = claimCache.getContentClaim(); + claimLog.debug("Creating ContentClaim {} for 'write' for {}", newClaim, source); + ensureNotAppending(newClaim); + + final OutputStream rawStream = claimCache.write(newClaim); + final OutputStream disableOnClose = new DisableOnCloseOutputStream(rawStream); + final ByteCountingOutputStream countingOut = new ByteCountingOutputStream(disableOnClose); + + final FlowFile sourceFlowFile = source; + final ContentClaim updatedClaim = newClaim; + final OutputStream errorHandlingOutputStream = new OutputStream() { + private boolean closed = false; + + @Override + public void write(final int b) throws IOException { + try { + countingOut.write(b); + } catch (final IOException ioe) { + LOG.error("Failed to write content to " + sourceFlowFile + "; rolling back session", ioe); + rollback(true); + close(); + throw new FlowFileAccessException("Failed to write to Content Repository for " + sourceFlowFile, ioe); + } + } + + @Override + public void write(final byte[] b) throws IOException { + try { + countingOut.write(b); + } catch (final IOException ioe) { + LOG.error("Failed to write content to " + sourceFlowFile + "; rolling back session", ioe); + rollback(true); + close(); + throw new FlowFileAccessException("Failed to write to Content Repository for " + sourceFlowFile, ioe); + } + } + + @Override + public void write(final byte[] b, final int off, final int len) throws IOException { + try { + countingOut.write(b, off, len); + } catch (final IOException ioe) { + LOG.error("Failed to write content to " + sourceFlowFile + "; rolling back session", ioe); + rollback(true); + close(); + throw new FlowFileAccessException("Failed to write to Content Repository for " + sourceFlowFile, ioe); + } + } + + @Override + public void flush() throws IOException { + try { + countingOut.flush(); + } catch (final IOException ioe) { + LOG.error("Failed to write content to " + sourceFlowFile + "; rolling back session", ioe); + rollback(true); + close(); + throw new FlowFileAccessException("Failed to write to Content Repository for " + sourceFlowFile, ioe); + } + } + + @Override + public void close() throws IOException { + writeRecursionSet.remove(sourceFlowFile); + + final long bytesWritten = countingOut.getBytesWritten(); + if (!closed) { + StandardProcessSession.this.bytesWritten += bytesWritten; + closed = true; + } + + openOutputStreams.remove(sourceFlowFile); + + removeTemporaryClaim(record); + + flush(); + final FlowFileRecord newFile = new StandardFlowFileRecord.Builder() + .fromFlowFile(record.getCurrent()) + .contentClaim(updatedClaim) + .contentClaimOffset(Math.max(0, updatedClaim.getLength() - bytesWritten)) + .size(bytesWritten) + .build(); + + record.setWorking(newFile); + } + }; + + writeRecursionSet.add(source); + openOutputStreams.put(source, errorHandlingOutputStream); + return errorHandlingOutputStream; + } catch (final ContentNotFoundException nfe) { + resetWriteClaims(); // need to reset write claim before we can remove the claim + destroyContent(newClaim); + handleContentNotFound(nfe, record); + throw nfe; + } catch (final FlowFileAccessException ffae) { + resetWriteClaims(); // need to reset write claim before we can remove the claim + destroyContent(newClaim); + throw ffae; + } catch (final IOException ioe) { + resetWriteClaims(); // need to reset write claim before we can remove the claim + destroyContent(newClaim); + throw new ProcessException("IOException thrown from " + connectableDescription + ": " + ioe.toString(), ioe); + } catch (final Throwable t) { + resetWriteClaims(); // need to reset write claim before we can remove the claim + destroyContent(newClaim); + throw t; + } + } + + @Override + public FlowFile write(FlowFile source, final OutputStreamCallback writer) { + source = validateRecordState(source); final StandardRepositoryRecord record = records.get(source); long writtenToFlowFile = 0L; @@ -2373,14 +2523,14 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE final OutputStream disableOnClose = new DisableOnCloseOutputStream(stream); final ByteCountingOutputStream countingOut = new ByteCountingOutputStream(disableOnClose)) { try { - recursionSet.add(source); + writeRecursionSet.add(source); writer.process(new FlowFileAccessOutputStream(countingOut, source)); } finally { writtenToFlowFile = countingOut.getBytesWritten(); bytesWritten += countingOut.getBytesWritten(); } } finally { - recursionSet.remove(source); + writeRecursionSet.remove(source); } } catch (final ContentNotFoundException nfe) { resetWriteClaims(); // need to reset write claim before we can remove the claim @@ -2412,9 +2562,10 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE return newFile; } + @Override - public FlowFile append(final FlowFile source, final OutputStreamCallback writer) { - validateRecordState(source); + public FlowFile append(FlowFile source, final OutputStreamCallback writer) { + source = validateRecordState(source); final StandardRepositoryRecord record = records.get(source); long newSize = 0L; @@ -2445,10 +2596,10 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE // wrap our OutputStreams so that the processor cannot close it try (final OutputStream disableOnClose = new DisableOnCloseOutputStream(outStream)) { - recursionSet.add(source); + writeRecursionSet.add(source); writer.process(new FlowFileAccessOutputStream(disableOnClose, source)); } finally { - recursionSet.remove(source); + writeRecursionSet.remove(source); } } } else { @@ -2458,10 +2609,10 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE // wrap our OutputStreams so that the processor cannot close it try (final OutputStream disableOnClose = new DisableOnCloseOutputStream(outStream); final OutputStream flowFileAccessOutStream = new FlowFileAccessOutputStream(disableOnClose, source)) { - recursionSet.add(source); + writeRecursionSet.add(source); writer.process(flowFileAccessOutStream); } finally { - recursionSet.remove(source); + writeRecursionSet.remove(source); } } @@ -2593,8 +2744,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE @Override - public FlowFile write(final FlowFile source, final StreamCallback writer) { - validateRecordState(source); + public FlowFile write(FlowFile source, final StreamCallback writer) { + source = validateRecordState(source); final StandardRepositoryRecord record = records.get(source); final ContentClaim currClaim = record.getCurrentClaim(); @@ -2618,7 +2769,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE final OutputStream disableOnCloseOut = new DisableOnCloseOutputStream(os); final ByteCountingOutputStream countingOut = new ByteCountingOutputStream(disableOnCloseOut)) { - recursionSet.add(source); + writeRecursionSet.add(source); // We want to differentiate between IOExceptions thrown by the repository and IOExceptions thrown from // Processor code. As a result, as have the FlowFileAccessInputStream that catches IOException from the repository @@ -2637,7 +2788,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE writtenToFlowFile = countingOut.getBytesWritten(); this.bytesWritten += writtenToFlowFile; this.bytesRead += countingIn.getBytesRead(); - recursionSet.remove(source); + writeRecursionSet.remove(source); // if cnfeThrown is true, we don't need to re-thrown the Exception; it will propagate. if (!cnfeThrown && ffais.getContentNotFoundException() != null) { @@ -2672,8 +2823,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } @Override - public FlowFile importFrom(final Path source, final boolean keepSourceFile, final FlowFile destination) { - validateRecordState(destination); + public FlowFile importFrom(final Path source, final boolean keepSourceFile, FlowFile destination) { + destination = validateRecordState(destination); // TODO: find a better solution. With Windows 7 and Java 7 (very early update, at least), Files.isWritable(source.getParent()) returns false, even when it should be true. if (!keepSourceFile && !Files.isWritable(source.getParent()) && !source.getParent().toFile().canWrite()) { // If we do NOT want to keep the file, ensure that we can delete it, or else error. @@ -2722,8 +2873,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } @Override - public FlowFile importFrom(final InputStream source, final FlowFile destination) { - validateRecordState(destination); + public FlowFile importFrom(final InputStream source, FlowFile destination) { + destination = validateRecordState(destination); final StandardRepositoryRecord record = records.get(destination); ContentClaim newClaim = null; final long claimOffset = 0L; @@ -2759,8 +2910,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } @Override - public void exportTo(final FlowFile source, final Path destination, final boolean append) { - validateRecordState(source); + public void exportTo(FlowFile source, final Path destination, final boolean append) { + source = validateRecordState(source); final StandardRepositoryRecord record = records.get(source); try { ensureNotAppending(record.getCurrentClaim()); @@ -2777,8 +2928,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } @Override - public void exportTo(final FlowFile source, final OutputStream destination) { - validateRecordState(source); + public void exportTo(FlowFile source, final OutputStream destination) { + source = validateRecordState(source); final StandardRepositoryRecord record = records.get(source); if(record.getCurrentClaim() == null) { @@ -2806,13 +2957,14 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE boolean cnfeThrown = false; try { - recursionSet.add(source); + incrementReadCount(source); StreamUtils.copy(ffais, destination, source.getSize()); } catch (final ContentNotFoundException cnfe) { cnfeThrown = true; throw cnfe; } finally { - recursionSet.remove(source); + decrementReadCount(source); + IOUtils.closeQuietly(ffais); // if cnfeThrown is true, we don't need to re-throw the Exception; it will propagate. if (!cnfeThrown && ffais.getContentNotFoundException() != null) { @@ -2855,35 +3007,41 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } } - private void validateRecordState(final FlowFile... flowFiles) { - for (final FlowFile file : flowFiles) { - if (recursionSet.contains(file)) { - throw new IllegalStateException(file + " already in use for an active callback or InputStream created by ProcessSession.read(FlowFile) has not been closed"); - } - final StandardRepositoryRecord record = records.get(file); - if (record == null) { - rollback(); - throw new FlowFileHandlingException(file + " is not known in this session (" + toString() + ")"); - } - if (record.getCurrent() != file) { - rollback(); - throw new FlowFileHandlingException(file + " is not the most recent version of this FlowFile within this session (" + toString() + ")"); - } - if (record.getTransferRelationship() != null) { - rollback(); - throw new FlowFileHandlingException(file + " is already marked for transfer"); - } - if (record.isMarkedForDelete()) { - rollback(); - throw new FlowFileHandlingException(file + " has already been marked for removal"); - } - } + private FlowFile validateRecordState(final FlowFile flowFile) { + return validateRecordState(flowFile, false); } - private void validateRecordState(final Collection flowFiles) { - for (final FlowFile flowFile : flowFiles) { - validateRecordState(flowFile); + private FlowFile validateRecordState(final FlowFile flowFile, final boolean allowRecursiveRead) { + if (!allowRecursiveRead && readRecursionSet.containsKey(flowFile)) { + throw new IllegalStateException(flowFile + " already in use for an active callback or an InputStream created by ProcessSession.read(FlowFile) has not been closed"); } + if (writeRecursionSet.contains(flowFile)) { + throw new IllegalStateException(flowFile + " already in use for an active callback or an OutputStream created by ProcessSession.write(FlowFile) has not been closed"); + } + + final StandardRepositoryRecord record = records.get(flowFile); + if (record == null) { + rollback(); + throw new FlowFileHandlingException(flowFile + " is not known in this session (" + toString() + ")"); + } + if (record.getTransferRelationship() != null) { + rollback(); + throw new FlowFileHandlingException(flowFile + " is already marked for transfer"); + } + if (record.isMarkedForDelete()) { + rollback(); + throw new FlowFileHandlingException(flowFile + " has already been marked for removal"); + } + + return record.getCurrent(); + } + + private List validateRecordState(final Collection flowFiles) { + final List current = new ArrayList<>(flowFiles.size()); + for (final FlowFile flowFile : flowFiles) { + current.add(validateRecordState(flowFile)); + } + return current; } /** diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java index 1dcb9919dc..5a939aecbe 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java @@ -1509,7 +1509,17 @@ public class TestStandardProcessSession { StreamUtils.fillBuffer(in, buffer); assertEquals("hello, world", new String(buffer)); + try { + session.remove(flowFile); + Assert.fail("Was able to remove FlowFile while an InputStream is open for it"); + } catch (final IllegalStateException e) { + // expected + } + + in.close(); + session.remove(flowFile); + session.commit(); // This should generate a WARN log message. We can't really test this in a unit test but can verify manually. } @@ -1544,6 +1554,96 @@ public class TestStandardProcessSession { session.commit(); } + @Test + public void testWriteToOutputStream() throws IOException { + final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() + .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") + .entryDate(System.currentTimeMillis()) + .size(12L) + .build(); + + flowFileQueue.put(flowFileRecord); + + FlowFile flowFile = session.get(); + try (final OutputStream out = session.write(flowFile)) { + out.write("hello, world".getBytes()); + } + + // Call putAllAttributes, because this will return to us the most recent version + // of the FlowFile. In a Processor, we wouldn't need this, but for testing purposes + // we need it in order to get the Content Claim. + flowFile = session.putAllAttributes(flowFile, Collections.emptyMap()); + assertEquals(12L, flowFile.getSize()); + + final byte[] buffer = new byte[(int) flowFile.getSize()]; + try (final InputStream in = session.read(flowFile)) { + StreamUtils.fillBuffer(in, buffer); + } + + assertEquals(new String(buffer), "hello, world"); + } + + @Test + public void testWriteToOutputStreamWhileReading() throws IOException { + final ContentClaim claim = contentRepo.create(false); + try (final OutputStream out = contentRepo.write(claim)) { + out.write("hello, world".getBytes()); + } + + final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() + .contentClaim(claim) + .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") + .entryDate(System.currentTimeMillis()) + .size(12L) + .build(); + flowFileQueue.put(flowFileRecord); + + final FlowFile flowFile = session.get(); + InputStream in = session.read(flowFile); + + try { + session.write(flowFile); + Assert.fail("Was able to obtain an OutputStream for a FlowFile while also holding an InputStream for it"); + } catch (final IllegalStateException e) { + // expected + } finally { + in.close(); + } + + // Should now be okay + try (final OutputStream out = session.write(flowFile)) { + + } + } + + @Test + public void testReadFromInputStreamWhileWriting() throws IOException { + final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() + .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") + .entryDate(System.currentTimeMillis()) + .size(12L) + .build(); + flowFileQueue.put(flowFileRecord); + + final FlowFile flowFile = session.get(); + OutputStream out = session.write(flowFile); + + try { + session.read(flowFile); + Assert.fail("Was able to obtain an InputStream for a FlowFile while also holding an OutputStream for it"); + } catch (final IllegalStateException e) { + // expected + } finally { + out.close(); + } + + // Should now be okay + try (final InputStream in = session.read(flowFile)) { + + } + } + + @Test public void testTransferUnknownRelationship() { final FlowFileRecord flowFileRecord1 = new StandardFlowFileRecord.Builder() @@ -1672,6 +1772,31 @@ public class TestStandardProcessSession { assertEquals(5, transientClaims.size()); } + @Test + public void testMultipleReadCounts() throws IOException { + flowFileQueue.put(new MockFlowFile(1L)); + + FlowFile flowFile = session.get(); + + final List streams = new ArrayList<>(); + for (int i = 0; i < 3; i++) { + streams.add(session.read(flowFile)); + } + + for (int i = 0; i < 3; i++) { + try { + flowFile = session.putAttribute(flowFile, "counter", String.valueOf(i)); + Assert.fail("Was able to put attribute while reading"); + } catch (final IllegalStateException ise) { + // expected + } + + streams.get(i).close(); + } + + flowFile = session.putAttribute(flowFile, "counter", "4"); + } + private static class MockFlowFileRepository implements FlowFileRepository {