From e9af6c6ad85bb7eafbd8d0703e783032120ea577 Mon Sep 17 00:00:00 2001 From: Koji Kawamura Date: Tue, 30 Jan 2018 15:21:00 +0900 Subject: [PATCH] NIFI-4828: Fix MergeContent to process all ready bins Before this fix, MergeContent only processed the first bin even if there were multiple bins. There were two unit tests marked with Ignore those had been failing because of this. This closes #2444. Signed-off-by: Mark Payne --- .../nifi/processor/util/bin/BinFiles.java | 60 +++++++++---------- .../nifi/processor/util/bin/BinManager.java | 49 ++++++--------- .../processors/standard/TestMergeContent.java | 7 +-- 3 files changed, 48 insertions(+), 68 deletions(-) diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java index 7f79b708c5..643aae4157 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java @@ -217,48 +217,45 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor { } private int processBins(final ProcessContext context) { - final Bin bin = readyBins.poll(); - if (bin == null) { - return 0; - } - - final List bins = new ArrayList<>(); - bins.add(bin); - final ComponentLog logger = getLogger(); + int processedBins = 0; + Bin bin; + while ((bin = readyBins.poll()) != null) { + boolean binAlreadyCommitted; + try { + binAlreadyCommitted = this.processBin(bin, context); + } catch (final ProcessException e) { + logger.error("Failed to process bundle of {} files due to {}", new Object[] {bin.getContents().size(), e}); - boolean binAlreadyCommitted = false; - try { - binAlreadyCommitted = this.processBin(bin, context); - } catch (final ProcessException e) { - logger.error("Failed to process bundle of {} files due to {}", new Object[] {bin.getContents().size(), e}); + final ProcessSession binSession = bin.getSession(); + for (final FlowFile flowFile : bin.getContents()) { + binSession.transfer(flowFile, REL_FAILURE); + } + binSession.commit(); + continue; + } catch (final Exception e) { + logger.error("Failed to process bundle of {} files due to {}; rolling back sessions", new Object[] {bin.getContents().size(), e}); - final ProcessSession binSession = bin.getSession(); - for (final FlowFile flowFile : bin.getContents()) { - binSession.transfer(flowFile, REL_FAILURE); + bin.getSession().rollback(); + continue; } - binSession.commit(); - return 1; - } catch (final Exception e) { - logger.error("Failed to process bundle of {} files due to {}; rolling back sessions", new Object[] {bin.getContents().size(), e}); - bin.getSession().rollback(); - return 1; + // If this bin's session has been committed, move on. + if (!binAlreadyCommitted) { + final ProcessSession binSession = bin.getSession(); + binSession.transfer(bin.getContents(), REL_ORIGINAL); + binSession.commit(); + } + + processedBins++; } - // If this bin's session has been committed, move on. - if (!binAlreadyCommitted) { - final ProcessSession binSession = bin.getSession(); - binSession.transfer(bin.getContents(), REL_ORIGINAL); - binSession.commit(); - } - - return 1; + return processedBins; } private int binFlowFiles(final ProcessContext context, final ProcessSessionFactory sessionFactory) { int flowFilesBinned = 0; - while (binManager.getBinCount() <= context.getProperty(MAX_BIN_COUNT).asInteger().intValue()) { + while (binManager.getBinCount() <= context.getProperty(MAX_BIN_COUNT).asInteger()) { if (!isScheduled()) { break; } @@ -290,6 +287,7 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor { bin.offer(flowFile, session); this.readyBins.add(bin); } + flowFilesBinned += entry.getValue().size(); } } diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinManager.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinManager.java index d6a8567134..e6cec7858f 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinManager.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinManager.java @@ -175,40 +175,25 @@ public class BinManager { continue; } - final List currentBins = groupBinMap.get(groupIdentifier); - if (currentBins == null) { // this is a new group we need to register - final List bins = new ArrayList<>(); - final Bin bin = new Bin(sessionFactory.createSession(), minSizeBytes.get(), currentMaxSizeBytes, minEntries.get(), - maxEntries.get(), fileCountAttribute.get()); - bins.add(bin); - groupBinMap.put(groupIdentifier, bins); - binCount++; - - final boolean added = bin.offer(flowFile, session); - if (!added) { - unbinned.add(flowFile); + final List currentBins = groupBinMap.computeIfAbsent(groupIdentifier, k -> new ArrayList<>()); + for (final Bin bin : currentBins) { + final boolean accepted = bin.offer(flowFile, session); + if (accepted) { + continue flowFileLoop; } - continue; - } else { - for (final Bin bin : currentBins) { - final boolean accepted = bin.offer(flowFile, session); - if (accepted) { - continue flowFileLoop; - } - } - - //if we've reached this point then we couldn't fit it into any existing bins - gotta make a new one - final Bin bin = new Bin(sessionFactory.createSession(), minSizeBytes.get(), currentMaxSizeBytes, minEntries.get(), - maxEntries.get(), fileCountAttribute.get()); - currentBins.add(bin); - binCount++; - final boolean added = bin.offer(flowFile, session); - if (!added) { - unbinned.add(flowFile); - } - - continue; } + + // if we've reached this point then the groupIdentifier was a brand new one, + // or we couldn't fit it into any existing bins - gotta make a new one + final Bin bin = new Bin(sessionFactory.createSession(), minSizeBytes.get(), currentMaxSizeBytes, minEntries.get(), + maxEntries.get(), fileCountAttribute.get()); + currentBins.add(bin); + binCount++; + final boolean added = bin.offer(flowFile, session); + if (!added) { + unbinned.add(flowFile); + } + } } finally { wLock.unlock(); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java index d4617125d4..3d8e015bba 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java @@ -57,7 +57,6 @@ import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.Assert; import org.junit.BeforeClass; -import org.junit.Ignore; import org.junit.Test; public class TestMergeContent { @@ -911,7 +910,6 @@ public class TestMergeContent { assembled.assertContentEquals("A Man A Plan A Canal Panama".getBytes("UTF-8")); } - @Ignore("this test appears to be faulty") @Test public void testDefragmentMultipleMingledSegments() throws IOException { final TestRunner runner = TestRunners.newTestRunner(new MergeContent()); @@ -941,7 +939,7 @@ public class TestMergeContent { attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "4"); runner.enqueue("Panama".getBytes("UTF-8"), attributes); - runner.run(2); + runner.run(1); runner.assertTransferCount(MergeContent.REL_MERGED, 2); final MockFlowFile assembled = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0); @@ -1007,7 +1005,6 @@ public class TestMergeContent { assembled.assertContentEquals("A Man A Plan A Canal Panama".getBytes("UTF-8")); } - @Ignore("This test appears to be a fail...is retuning 1 instead of 2...needs work") @Test public void testMergeBasedOnCorrelation() throws IOException, InterruptedException { final TestRunner runner = TestRunners.newTestRunner(new MergeContent()); @@ -1028,7 +1025,7 @@ public class TestMergeContent { attributes.put("attr", "b"); runner.enqueue("Panama".getBytes("UTF-8"), attributes); - runner.run(2); + runner.run(1); runner.assertTransferCount(MergeContent.REL_MERGED, 2);