From d7f2eb7c263aa9511dba5a2f5760f779ff1a12f9 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Wed, 16 Nov 2022 14:35:24 -0500 Subject: [PATCH] NIFI-10818: Fixed bug in which oldest bin in MergeContent was not being evicted in order to make room for a new bin when using a correlation attribute. Added additional tests for JoinEnrichment to ensure that the changes had no adverse effects on that processor. Found a bug when running that test in MockProcessSession so also addressed the bug (ConcurrentModificationException). Signed-off-by: Matthew Burgess This closes #6668 --- .../apache/nifi/util/MockProcessSession.java | 2 +- .../nifi/processor/util/bin/BinFiles.java | 53 +++++++++++---- .../processors/standard/JoinEnrichment.java | 1 + .../standard/TestJoinEnrichment.java | 64 +++++++++++++++++++ 4 files changed, 107 insertions(+), 13 deletions(-) diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java index bb9d40d5b2..2b8326ed50 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java @@ -136,7 +136,7 @@ public class MockProcessSession implements ProcessSession { } public void migrate(final ProcessSession newOwner) { - migrate(newOwner, (Collection) currentVersions.values()); + migrate(newOwner, new ArrayList<>((Collection) currentVersions.values())); } @Override diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-bin-manager/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java b/nifi-nar-bundles/nifi-extension-utils/nifi-bin-manager/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java index d7a301b3e6..c9d40829dd 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-bin-manager/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-bin-manager/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java @@ -174,13 +174,13 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor { public final void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException { final int totalBinCount = binManager.getBinCount() + readyBins.size(); final int maxBinCount = context.getProperty(MAX_BIN_COUNT).asInteger(); - final int flowFilesBinned; + final BinningResult binningResult; - if (totalBinCount < maxBinCount) { - flowFilesBinned = binFlowFiles(context, sessionFactory); - getLogger().debug("Binned {} FlowFiles", new Object[] {flowFilesBinned}); + if (totalBinCount <= maxBinCount) { + binningResult = binFlowFiles(context, sessionFactory); + getLogger().debug("Binned {} FlowFiles", binningResult.getFlowFilesBinned()); } else { - flowFilesBinned = 0; + binningResult = BinningResult.EMPTY; getLogger().debug("Will not bin any FlowFiles because {} bins already exist;" + "will wait until bins have been emptied before any more are created", new Object[] {totalBinCount}); } @@ -189,25 +189,28 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor { return; } - final int binsMigrated = migrateBins(context, flowFilesBinned == 0); + final int binsMigrated = migrateBins(context, binningResult.getFlowFilesBinned() == 0, binningResult.isNewBinNeeded()); final int binsProcessed = processBins(context, sessionFactory); //If we accomplished nothing then let's yield - if (flowFilesBinned == 0 && binsMigrated == 0 && binsProcessed == 0) { + if (binningResult.getFlowFilesBinned() == 0 && binsMigrated == 0 && binsProcessed == 0) { context.yield(); } } - private int migrateBins(final ProcessContext context, final boolean relaxFullnessConstraint) { + private int migrateBins(final ProcessContext context, final boolean relaxFullnessConstraint, final boolean newBinNeeded) { int added = 0; for (final Bin bin : binManager.removeReadyBins(relaxFullnessConstraint)) { this.readyBins.add(bin); added++; } - // if we have created all of the bins that are allowed, go ahead and remove the oldest one. If we don't do + // Evict the oldest bin if we were not able to evict any based on size (added = 0) and either we need a new bin, + // or we've already created too many. If we don't do // this, then we will simply wait for it to expire because we can't get any more FlowFiles into the // bins. So we may as well expire it now. - if (added == 0 && binManager.getBinCount() > context.getProperty(MAX_BIN_COUNT).asInteger()) { + final int currentBinCount = binManager.getBinCount(); + final int maxBinCount = context.getProperty(MAX_BIN_COUNT).asInteger(); + if (added == 0 && ((currentBinCount > maxBinCount) || (currentBinCount == maxBinCount && newBinNeeded))) { final Bin bin = binManager.removeOldestBin(); if (bin != null) { added++; @@ -261,11 +264,12 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor { return processedBins; } - private int binFlowFiles(final ProcessContext context, final ProcessSessionFactory sessionFactory) { + private BinningResult binFlowFiles(final ProcessContext context, final ProcessSessionFactory sessionFactory) { int flowFilesBinned = 0; final ProcessSession session = sessionFactory.createSession(); final int maxBinCount = context.getProperty(MAX_BIN_COUNT).asInteger(); + boolean newBinNeeded = false; while (binManager.getBinCount() <= maxBinCount) { if (!isScheduled()) { break; @@ -292,16 +296,21 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor { for (final Map.Entry> entry : flowFileGroups.entrySet()) { final Set unbinned = binManager.offer(entry.getKey(), entry.getValue(), session, sessionFactory); + if (!unbinned.isEmpty()) { + newBinNeeded = true; + } + for (final FlowFile flowFile : unbinned) { Bin bin = new Bin(sessionFactory.createSession(), 0, Long.MAX_VALUE, 0, Integer.MAX_VALUE, null); bin.offer(flowFile, session); this.readyBins.add(bin); } + flowFilesBinned += entry.getValue().size(); } } - return flowFilesBinned; + return new BinningResult(flowFilesBinned, newBinNeeded); } @OnScheduled @@ -384,4 +393,24 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor { return problems; } + + private static class BinningResult { + private final int flowFilesBinned; + private final boolean newBinNeeded; + + public BinningResult(final int flowFilesBinned, final boolean newBinNeeded) { + this.flowFilesBinned = flowFilesBinned; + this.newBinNeeded = newBinNeeded; + } + + public int getFlowFilesBinned() { + return flowFilesBinned; + } + + public boolean isNewBinNeeded() { + return newBinNeeded; + } + + public static BinningResult EMPTY = new BinningResult(0, false); + } } \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JoinEnrichment.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JoinEnrichment.java index 73dc768927..34c04f9635 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JoinEnrichment.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JoinEnrichment.java @@ -172,6 +172,7 @@ public class JoinEnrichment extends BinFiles { "does not point to any existing field in the original Record, the enrichment will not be inserted.") .required(true) .addValidator(new RecordPathValidator()) + .defaultValue("/") .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .dependsOn(JOIN_STRATEGY, JOIN_INSERT_ENRICHMENT_FIELDS) .build(); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestJoinEnrichment.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestJoinEnrichment.java index 0f24c99255..000c2051f8 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestJoinEnrichment.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestJoinEnrichment.java @@ -41,6 +41,7 @@ import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; +import java.util.BitSet; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -50,10 +51,73 @@ import java.util.stream.Collectors; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; public class TestJoinEnrichment { private static final File EXAMPLES_DIR = new File("src/test/resources/TestJoinEnrichment"); + @Test + public void testManyQueued() throws InitializationException { + final TestRunner runner = TestRunners.newTestRunner(new JoinEnrichment()); + + final ArrayListRecordWriter writer = setupCsvServices(runner); + runner.setProperty(JoinEnrichment.JOIN_STRATEGY, JoinEnrichment.JOIN_SQL); + runner.setProperty(JoinEnrichment.SQL, "SELECT original.i, original.lower_letter, enrichment.upper_letter FROM original JOIN enrichment ON original.i = enrichment.i"); + + // Enqueue a flowfile where i=0, lower_letter=a; another with i=1, lower_letter=b; etc. up to i=25, lower_letter=z + for (int i=0; i < 26; i++) { + final Map originalAttributes = new HashMap<>(); + originalAttributes.put("enrichment.group.id", String.valueOf(i)); + originalAttributes.put("enrichment.role", "ORIGINAL"); + + final char letter = (char) ('a' + i); + + runner.enqueue("i,lower_letter\n" + i + "," + letter, originalAttributes); + } + + // Enqueue a flowfile where i=0, upper_letter=A; another with i=1, upper_letter=B; etc. up to i=25, upper_letter=Z + for (int i=0; i < 26; i++) { + final Map enrichmentAttributes = new HashMap<>(); + enrichmentAttributes.put("enrichment.group.id", String.valueOf(i)); + enrichmentAttributes.put("enrichment.role", "ENRICHMENT"); + + final char letter = (char) ('A' + i); + runner.enqueue("i,upper_letter\n" + i + "," + letter, enrichmentAttributes); + } + + runner.run(); + + // Ensure that the result is i=0,lower_letter=a,upper_letter=A ... i=25,lower_letter=z,upper_letter=Z + runner.assertTransferCount(JoinEnrichment.REL_JOINED, 26); + runner.assertTransferCount(JoinEnrichment.REL_ORIGINAL, 52); + + final List written = writer.getRecordsWritten(); + assertEquals(26, written.size()); + + final BitSet found = new BitSet(); + for (final Record outRecord : written) { + final RecordSchema schema = outRecord.getSchema(); + assertEquals(RecordFieldType.STRING, schema.getField("i").get().getDataType().getFieldType()); + assertEquals(RecordFieldType.STRING, schema.getField("lower_letter").get().getDataType().getFieldType()); + assertEquals(RecordFieldType.STRING, schema.getField("upper_letter").get().getDataType().getFieldType()); + + final int id = outRecord.getAsInt("i"); + + final String expectedLower = "" + ((char) ('a' + id)); + assertEquals(expectedLower, outRecord.getValue("lower_letter")); + + final String expectedUpper = "" + ((char) ('A' + id)); + assertEquals(expectedUpper, outRecord.getValue("upper_letter")); + + assertEquals(outRecord.getAsString("lower_letter"), outRecord.getAsString("upper_letter").toLowerCase()); + + found.set(id); + } + + for (int i=0; i < 26; i++) { + assertTrue(found.get(i)); + } + } @Test public void testSimpleSqlJoin() throws InitializationException {