diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java index 67e37c28d8..b15d23b65a 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java @@ -205,7 +205,7 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor { // if we have created all of the bins that are allowed, go ahead and remove the oldest one. If we don't do // this, then we will simply wait for it to expire because we can't get any more FlowFiles into the // bins. So we may as well expire it now. - if (added == 0 && (readyBins.size() + binManager.getBinCount()) >= context.getProperty(MAX_BIN_COUNT).asInteger()) { + if (added == 0 && binManager.getBinCount() >= context.getProperty(MAX_BIN_COUNT).asInteger()) { final Bin bin = binManager.removeOldestBin(); if (bin != null) { added++; diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java index b6025c5482..b8f9210483 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java @@ -911,6 +911,24 @@ public class TestMergeContent { bundle.assertAttributeExists(MergeContent.MERGE_BIN_AGE_ATTRIBUTE); } + @Test + public void testLeavesSmallBinUnmerged() { + final TestRunner runner = TestRunners.newTestRunner(new MergeContent()); + runner.setProperty(MergeContent.MIN_ENTRIES, "5"); + runner.setProperty(MergeContent.MAX_ENTRIES, "5"); + runner.setProperty(MergeContent.MAX_BIN_COUNT, "3"); + + for (int i = 0; i < 17; i++) { + runner.enqueue(String.valueOf(i) + "\n"); + } + + runner.run(5); + + runner.assertTransferCount(MergeContent.REL_MERGED, 3); + runner.assertTransferCount(MergeContent.REL_ORIGINAL, 15); + assertEquals(2, runner.getQueueSize().getObjectCount()); + } + private void createFlowFiles(final TestRunner testRunner) throws UnsupportedEncodingException { final Map attributes = new HashMap<>(); attributes.put(CoreAttributes.MIME_TYPE.key(), "application/plain-text");