diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/BinFiles.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/BinFiles.java index 399a12b9ae..a7b4b28b2e 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/BinFiles.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/BinFiles.java @@ -171,17 +171,17 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor { @Override public final void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException { - int binsAdded = binFlowFiles(context, sessionFactory); - getLogger().debug("Binned {} FlowFiles", new Object[]{binsAdded}); + final int flowFilesBinned = binFlowFiles(context, sessionFactory); + getLogger().debug("Binned {} FlowFiles", new Object[]{flowFilesBinned}); if (!isScheduled()) { return; } - binsAdded += migrateBins(context); - + final int binsMigrated = migrateBins(context); final int binsProcessed = processBins(context, sessionFactory); - if (binsProcessed == 0 && binsAdded == 0) { + //If we accomplished nothing then let's yield + if (flowFilesBinned == 0 && binsMigrated == 0 && binsProcessed == 0) { context.yield(); } } @@ -203,7 +203,6 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor { this.readyBins.add(bin); } } - return added; } @@ -251,16 +250,16 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor { } private int binFlowFiles(final ProcessContext context, final ProcessSessionFactory sessionFactory) { - int binsAdded = 0; - while (binManager.getBinCount() < context.getProperty(MAX_BIN_COUNT).asInteger().intValue()) { + int flowFilesBinned = 0; + while (binManager.getBinCount() <= context.getProperty(MAX_BIN_COUNT).asInteger().intValue()) { if (!isScheduled()) { - return binsAdded; + break; } final ProcessSession session = sessionFactory.createSession(); FlowFile flowFile = session.get(); if (flowFile == null) { - return binsAdded; + break; } flowFile = this.preprocessFlowFile(context, session, flowFile); @@ -276,10 +275,10 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor { this.readyBins.add(bin); } - binsAdded++; + flowFilesBinned++; } - return binsAdded; + return flowFilesBinned; } @OnScheduled diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java index aad2593652..65925f7977 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java @@ -73,6 +73,26 @@ public class TestMergeContent { bundle.assertContentEquals("Hello, World!".getBytes("UTF-8")); bundle.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/plain-text"); } + + @Test + public void testSimpleBinaryConcatSingleBin() throws IOException, InterruptedException { + final TestRunner runner = TestRunners.newTestRunner(new MergeContent()); + runner.setProperty(MergeContent.MAX_BIN_AGE, "1 sec"); + runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_CONCAT); + runner.setProperty(MergeContent.MAX_BIN_COUNT, "1"); + + createFlowFiles(runner); + runner.run(); + + runner.assertQueueEmpty(); + runner.assertTransferCount(MergeContent.REL_MERGED, 1); + runner.assertTransferCount(MergeContent.REL_FAILURE, 0); + runner.assertTransferCount(MergeContent.REL_ORIGINAL, 3); + + final MockFlowFile bundle = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0); + bundle.assertContentEquals("Hello, World!".getBytes("UTF-8")); + bundle.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/plain-text"); + } @Test public void testSimpleBinaryConcatWithTextDelimiters() throws IOException, InterruptedException {