diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java index 7f79b708c5..643aae4157 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java @@ -217,48 +217,45 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor { } private int processBins(final ProcessContext context) { - final Bin bin = readyBins.poll(); - if (bin == null) { - return 0; - } - - final List bins = new ArrayList<>(); - bins.add(bin); - final ComponentLog logger = getLogger(); + int processedBins = 0; + Bin bin; + while ((bin = readyBins.poll()) != null) { + boolean binAlreadyCommitted; + try { + binAlreadyCommitted = this.processBin(bin, context); + } catch (final ProcessException e) { + logger.error("Failed to process bundle of {} files due to {}", new Object[] {bin.getContents().size(), e}); - boolean binAlreadyCommitted = false; - try { - binAlreadyCommitted = this.processBin(bin, context); - } catch (final ProcessException e) { - logger.error("Failed to process bundle of {} files due to {}", new Object[] {bin.getContents().size(), e}); + final ProcessSession binSession = bin.getSession(); + for (final FlowFile flowFile : bin.getContents()) { + binSession.transfer(flowFile, REL_FAILURE); + } + binSession.commit(); + continue; + } catch (final Exception e) { + logger.error("Failed to process bundle of {} files due to {}; rolling back sessions", new Object[] {bin.getContents().size(), e}); - final ProcessSession binSession = bin.getSession(); - for (final FlowFile flowFile : bin.getContents()) { - binSession.transfer(flowFile, REL_FAILURE); + bin.getSession().rollback(); + continue; } - binSession.commit(); - return 1; - } catch (final Exception e) { - logger.error("Failed to process bundle of {} files due to {}; rolling back sessions", new Object[] {bin.getContents().size(), e}); - bin.getSession().rollback(); - return 1; + // If this bin's session has been committed, move on. + if (!binAlreadyCommitted) { + final ProcessSession binSession = bin.getSession(); + binSession.transfer(bin.getContents(), REL_ORIGINAL); + binSession.commit(); + } + + processedBins++; } - // If this bin's session has been committed, move on. - if (!binAlreadyCommitted) { - final ProcessSession binSession = bin.getSession(); - binSession.transfer(bin.getContents(), REL_ORIGINAL); - binSession.commit(); - } - - return 1; + return processedBins; } private int binFlowFiles(final ProcessContext context, final ProcessSessionFactory sessionFactory) { int flowFilesBinned = 0; - while (binManager.getBinCount() <= context.getProperty(MAX_BIN_COUNT).asInteger().intValue()) { + while (binManager.getBinCount() <= context.getProperty(MAX_BIN_COUNT).asInteger()) { if (!isScheduled()) { break; } @@ -290,6 +287,7 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor { bin.offer(flowFile, session); this.readyBins.add(bin); } + flowFilesBinned += entry.getValue().size(); } } diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinManager.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinManager.java index d6a8567134..e6cec7858f 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinManager.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinManager.java @@ -175,40 +175,25 @@ public class BinManager { continue; } - final List currentBins = groupBinMap.get(groupIdentifier); - if (currentBins == null) { // this is a new group we need to register - final List bins = new ArrayList<>(); - final Bin bin = new Bin(sessionFactory.createSession(), minSizeBytes.get(), currentMaxSizeBytes, minEntries.get(), - maxEntries.get(), fileCountAttribute.get()); - bins.add(bin); - groupBinMap.put(groupIdentifier, bins); - binCount++; - - final boolean added = bin.offer(flowFile, session); - if (!added) { - unbinned.add(flowFile); + final List currentBins = groupBinMap.computeIfAbsent(groupIdentifier, k -> new ArrayList<>()); + for (final Bin bin : currentBins) { + final boolean accepted = bin.offer(flowFile, session); + if (accepted) { + continue flowFileLoop; } - continue; - } else { - for (final Bin bin : currentBins) { - final boolean accepted = bin.offer(flowFile, session); - if (accepted) { - continue flowFileLoop; - } - } - - //if we've reached this point then we couldn't fit it into any existing bins - gotta make a new one - final Bin bin = new Bin(sessionFactory.createSession(), minSizeBytes.get(), currentMaxSizeBytes, minEntries.get(), - maxEntries.get(), fileCountAttribute.get()); - currentBins.add(bin); - binCount++; - final boolean added = bin.offer(flowFile, session); - if (!added) { - unbinned.add(flowFile); - } - - continue; } + + // if we've reached this point then the groupIdentifier was a brand new one, + // or we couldn't fit it into any existing bins - gotta make a new one + final Bin bin = new Bin(sessionFactory.createSession(), minSizeBytes.get(), currentMaxSizeBytes, minEntries.get(), + maxEntries.get(), fileCountAttribute.get()); + currentBins.add(bin); + binCount++; + final boolean added = bin.offer(flowFile, session); + if (!added) { + unbinned.add(flowFile); + } + } } finally { wLock.unlock(); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java index d4617125d4..3d8e015bba 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java @@ -57,7 +57,6 @@ import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.Assert; import org.junit.BeforeClass; -import org.junit.Ignore; import org.junit.Test; public class TestMergeContent { @@ -911,7 +910,6 @@ public class TestMergeContent { assembled.assertContentEquals("A Man A Plan A Canal Panama".getBytes("UTF-8")); } - @Ignore("this test appears to be faulty") @Test public void testDefragmentMultipleMingledSegments() throws IOException { final TestRunner runner = TestRunners.newTestRunner(new MergeContent()); @@ -941,7 +939,7 @@ public class TestMergeContent { attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "4"); runner.enqueue("Panama".getBytes("UTF-8"), attributes); - runner.run(2); + runner.run(1); runner.assertTransferCount(MergeContent.REL_MERGED, 2); final MockFlowFile assembled = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0); @@ -1007,7 +1005,6 @@ public class TestMergeContent { assembled.assertContentEquals("A Man A Plan A Canal Panama".getBytes("UTF-8")); } - @Ignore("This test appears to be a fail...is retuning 1 instead of 2...needs work") @Test public void testMergeBasedOnCorrelation() throws IOException, InterruptedException { final TestRunner runner = TestRunners.newTestRunner(new MergeContent()); @@ -1028,7 +1025,7 @@ public class TestMergeContent { attributes.put("attr", "b"); runner.enqueue("Panama".getBytes("UTF-8"), attributes); - runner.run(2); + runner.run(1); runner.assertTransferCount(MergeContent.REL_MERGED, 2);