diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/Bin.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/Bin.java index b427e06f23..f95c4702f1 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/Bin.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/Bin.java @@ -48,10 +48,10 @@ public class Bin { private volatile int maximumEntries = Integer.MAX_VALUE; private final String fileCountAttribute; - final List binContents = new ArrayList<>(); + private final List binContents = new ArrayList<>(); private final Set binIndexSet = new HashSet<>(); - long size; - int successiveFailedOfferings = 0; + private long size; + private int successiveFailedOfferings = 0; /** * Constructs a new bin @@ -141,11 +141,11 @@ public class Bin { if (fileCountAttribute != null) { final String countValue = flowFile.getAttribute(fileCountAttribute); final Integer count = toInteger(countValue); - if (count != null) { - int currentMaxEntries = this.maximumEntries; - this.maximumEntries = Math.min(count, currentMaxEntries); - this.minimumEntries = currentMaxEntries; + if (count == null) { + return false; } + this.maximumEntries = count; + this.minimumEntries = count; final String index = flowFile.getAttribute(FRAGMENT_INDEX_ATTRIBUTE); if (index == null || index.isEmpty() || !binIndexSet.add(index)) { diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java index 643aae4157..bad0dedcf8 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java @@ -72,9 +72,9 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor { .build(); public static final PropertyDescriptor MAX_ENTRIES = new PropertyDescriptor.Builder() .name("Maximum Number of Entries") - .description("The maximum number of files to include in a bundle. If not specified, there is no maximum.") + .description("The maximum number of files to include in a bundle") .defaultValue("1000") - .required(false) + .required(true) .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) .build(); diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinManager.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinManager.java index e6cec7858f..60c29665d2 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinManager.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinManager.java @@ -76,6 +76,10 @@ public class BinManager { this.fileCountAttribute.set(fileCountAttribute); } + public String getFileCountAttribute() { + return fileCountAttribute.get(); + } + public void setMinimumEntries(final int minimumEntries) { this.minEntries.set(minimumEntries); } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java index af448d640e..0af5f8b20a 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java @@ -87,6 +87,9 @@ public class TestMergeContent { runner.assertTransferCount(MergeContent.REL_MERGED, 1); runner.assertTransferCount(MergeContent.REL_FAILURE, 0); + final MockFlowFile bundle = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0); + assertEquals(1024 * 6, bundle.getSize()); + // Queue should not be empty because the first FlowFile will be transferred back to the input queue // when we run out @OnStopped logic, since it won't be transferred to any bin. runner.assertQueueNotEmpty(); @@ -886,6 +889,33 @@ public class TestMergeContent { runner.assertTransferCount(MergeContent.REL_MERGED, 0); } + @Test + public void testDefragmentWithTooManyFragements() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new MergeContent()); + runner.setProperty(MergeContent.MERGE_STRATEGY, MergeContent.MERGE_STRATEGY_DEFRAGMENT); + runner.setProperty(MergeContent.MAX_ENTRIES, "3"); + + final Map attributes = new HashMap<>(); + attributes.put(MergeContent.FRAGMENT_ID_ATTRIBUTE, "1"); + attributes.put(MergeContent.FRAGMENT_COUNT_ATTRIBUTE, "4"); + attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "1"); + + runner.enqueue("A Man ".getBytes("UTF-8"), attributes); + attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "2"); + runner.enqueue("A Plan ".getBytes("UTF-8"), attributes); + attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "3"); + runner.enqueue("A Canal ".getBytes("UTF-8"), attributes); + attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "4"); + runner.enqueue("Panama".getBytes("UTF-8"), attributes); + + runner.run(); + + runner.assertTransferCount(MergeContent.REL_FAILURE, 0); + runner.assertTransferCount(MergeContent.REL_MERGED, 1); + final MockFlowFile assembled = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0); + assembled.assertContentEquals("A Man A Plan A Canal Panama".getBytes("UTF-8")); + } + @Test public void testDefragmentWithTooFewFragments() throws IOException { final TestRunner runner = TestRunners.newTestRunner(new MergeContent());