diff --git a/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java b/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java index 5ec1714d00..88c582c22b 100644 --- a/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java +++ b/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java @@ -735,7 +735,11 @@ public interface ProcessSession { * FlowFile content; if an attempt is made to access the InputStream * provided to the given InputStreamCallback after this method completed its * execution + * + * @deprecated Restricting the ProcessSession's ability to manage its own streams should not be used. The need for this + * capability was obviated by the introduction of the {@link #migrate(ProcessSession, Collection)} and {@link #migrate(ProcessSession)} methods. */ + @Deprecated void read(FlowFile source, boolean allowSessionStreamManagement, InputStreamCallback reader) throws FlowFileAccessException; /** diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java index 9dc33c3698..465ab443d4 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java @@ -453,7 +453,7 @@ public class MergeContent extends BinFiles { String groupId = correlationAttributeName == null ? null : flowFile.getAttribute(correlationAttributeName); // when MERGE_STRATEGY is Defragment and correlationAttributeName is null then bin by fragment.identifier - if (groupId == null && MERGE_STRATEGY_DEFRAGMENT.equals(context.getProperty(MERGE_STRATEGY).getValue())) { + if (groupId == null && MERGE_STRATEGY_DEFRAGMENT.getValue().equals(context.getProperty(MERGE_STRATEGY).getValue())) { groupId = flowFile.getAttribute(FRAGMENT_ID_ATTRIBUTE); } @@ -462,7 +462,7 @@ public class MergeContent extends BinFiles { @Override protected void setUpBinManager(final BinManager binManager, final ProcessContext context) { - if (MERGE_STRATEGY_DEFRAGMENT.equals(context.getProperty(MERGE_STRATEGY).getValue())) { + if (MERGE_STRATEGY_DEFRAGMENT.getValue().equals(context.getProperty(MERGE_STRATEGY).getValue())) { binManager.setFileCountAttribute(FRAGMENT_COUNT_ATTRIBUTE); } else { binManager.setFileCountAttribute(null); @@ -505,7 +505,7 @@ public class MergeContent extends BinFiles { final List contents = bin.getContents(); final ProcessSession binSession = bin.getSession(); - if (MERGE_STRATEGY_DEFRAGMENT.equals(context.getProperty(MERGE_STRATEGY).getValue())) { + if (MERGE_STRATEGY_DEFRAGMENT.getValue().equals(context.getProperty(MERGE_STRATEGY).getValue())) { final String error = getDefragmentValidationError(bin.getContents()); // Fail the flow files and commit them @@ -648,12 +648,7 @@ public class MergeContent extends BinFiles { final Iterator itr = contents.iterator(); while (itr.hasNext()) { final FlowFile flowFile = itr.next(); - bin.getSession().read(flowFile, false, new InputStreamCallback() { - @Override - public void process(final InputStream in) throws IOException { - StreamUtils.copy(in, out); - } - }); + bin.getSession().read(flowFile, in -> StreamUtils.copy(in, out)); if (itr.hasNext()) { if (demarcator != null) { @@ -694,7 +689,7 @@ public class MergeContent extends BinFiles { private byte[] getDelimiterContent(final ProcessContext context, final List wrappers, final PropertyDescriptor descriptor) throws IOException { final String delimiterStrategyValue = context.getProperty(DELIMITER_STRATEGY).getValue(); - if (DELIMITER_STRATEGY_FILENAME.equals(delimiterStrategyValue)) { + if (DELIMITER_STRATEGY_FILENAME.getValue().equals(delimiterStrategyValue)) { return getDelimiterFileContent(context, wrappers, descriptor); } else { return getDelimiterTextContent(context, wrappers, descriptor); @@ -881,7 +876,7 @@ public class MergeContent extends BinFiles { final OutputStream out = new NonCloseableOutputStream(bufferedOut); for (final FlowFile flowFile : contents) { - bin.getSession().read(flowFile, false, new InputStreamCallback() { + bin.getSession().read(flowFile, new InputStreamCallback() { @Override public void process(final InputStream rawIn) throws IOException { try (final InputStream in = new BufferedInputStream(rawIn)) { @@ -926,7 +921,7 @@ public class MergeContent extends BinFiles { private final int compressionLevel; - private List unmerged = new ArrayList<>(); + private final List unmerged = new ArrayList<>(); public ZipMerge(final int compressionLevel) { this.compressionLevel = compressionLevel; @@ -993,7 +988,7 @@ public class MergeContent extends BinFiles { private class AvroMerge implements MergeBin { - private List unmerged = new ArrayList<>(); + private final List unmerged = new ArrayList<>(); @Override public FlowFile merge(final Bin bin, final ProcessContext context) { @@ -1014,7 +1009,7 @@ public class MergeContent extends BinFiles { public void process(final OutputStream rawOut) throws IOException { try (final OutputStream out = new BufferedOutputStream(rawOut)) { for (final FlowFile flowFile : contents) { - bin.getSession().read(flowFile, false, new InputStreamCallback() { + bin.getSession().read(flowFile, new InputStreamCallback() { @Override public void process(InputStream in) throws IOException { boolean canMerge = true;