NIFI-10818: Fixed bug in which oldest bin in MergeContent was not being evicted in order to make room for a new bin when using a correlation attribute. Added additional tests for JoinEnrichment to ensure that the changes had no adverse effects on that processor. Found a bug when running that test in MockProcessSession so also addressed the bug (ConcurrentModificationException).

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #6668
This commit is contained in:
Mark Payne 2022-11-16 14:35:24 -05:00 committed by Matthew Burgess
parent 30facedc43
commit d7f2eb7c26
No known key found for this signature in database
GPG Key ID: 05D3DEB8126DAD24
4 changed files with 107 additions and 13 deletions

View File

@ -136,7 +136,7 @@ public class MockProcessSession implements ProcessSession {
}
public void migrate(final ProcessSession newOwner) {
migrate(newOwner, (Collection) currentVersions.values());
migrate(newOwner, new ArrayList<>((Collection) currentVersions.values()));
}
@Override

View File

@ -174,13 +174,13 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
public final void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
final int totalBinCount = binManager.getBinCount() + readyBins.size();
final int maxBinCount = context.getProperty(MAX_BIN_COUNT).asInteger();
final int flowFilesBinned;
final BinningResult binningResult;
if (totalBinCount < maxBinCount) {
flowFilesBinned = binFlowFiles(context, sessionFactory);
getLogger().debug("Binned {} FlowFiles", new Object[] {flowFilesBinned});
if (totalBinCount <= maxBinCount) {
binningResult = binFlowFiles(context, sessionFactory);
getLogger().debug("Binned {} FlowFiles", binningResult.getFlowFilesBinned());
} else {
flowFilesBinned = 0;
binningResult = BinningResult.EMPTY;
getLogger().debug("Will not bin any FlowFiles because {} bins already exist;"
+ "will wait until bins have been emptied before any more are created", new Object[] {totalBinCount});
}
@ -189,25 +189,28 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
return;
}
final int binsMigrated = migrateBins(context, flowFilesBinned == 0);
final int binsMigrated = migrateBins(context, binningResult.getFlowFilesBinned() == 0, binningResult.isNewBinNeeded());
final int binsProcessed = processBins(context, sessionFactory);
//If we accomplished nothing then let's yield
if (flowFilesBinned == 0 && binsMigrated == 0 && binsProcessed == 0) {
if (binningResult.getFlowFilesBinned() == 0 && binsMigrated == 0 && binsProcessed == 0) {
context.yield();
}
}
private int migrateBins(final ProcessContext context, final boolean relaxFullnessConstraint) {
private int migrateBins(final ProcessContext context, final boolean relaxFullnessConstraint, final boolean newBinNeeded) {
int added = 0;
for (final Bin bin : binManager.removeReadyBins(relaxFullnessConstraint)) {
this.readyBins.add(bin);
added++;
}
// if we have created all of the bins that are allowed, go ahead and remove the oldest one. If we don't do
// Evict the oldest bin if we were not able to evict any based on size (added = 0) and either we need a new bin,
// or we've already created too many. If we don't do
// this, then we will simply wait for it to expire because we can't get any more FlowFiles into the
// bins. So we may as well expire it now.
if (added == 0 && binManager.getBinCount() > context.getProperty(MAX_BIN_COUNT).asInteger()) {
final int currentBinCount = binManager.getBinCount();
final int maxBinCount = context.getProperty(MAX_BIN_COUNT).asInteger();
if (added == 0 && ((currentBinCount > maxBinCount) || (currentBinCount == maxBinCount && newBinNeeded))) {
final Bin bin = binManager.removeOldestBin();
if (bin != null) {
added++;
@ -261,11 +264,12 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
return processedBins;
}
private int binFlowFiles(final ProcessContext context, final ProcessSessionFactory sessionFactory) {
private BinningResult binFlowFiles(final ProcessContext context, final ProcessSessionFactory sessionFactory) {
int flowFilesBinned = 0;
final ProcessSession session = sessionFactory.createSession();
final int maxBinCount = context.getProperty(MAX_BIN_COUNT).asInteger();
boolean newBinNeeded = false;
while (binManager.getBinCount() <= maxBinCount) {
if (!isScheduled()) {
break;
@ -292,16 +296,21 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
for (final Map.Entry<String, List<FlowFile>> entry : flowFileGroups.entrySet()) {
final Set<FlowFile> unbinned = binManager.offer(entry.getKey(), entry.getValue(), session, sessionFactory);
if (!unbinned.isEmpty()) {
newBinNeeded = true;
}
for (final FlowFile flowFile : unbinned) {
Bin bin = new Bin(sessionFactory.createSession(), 0, Long.MAX_VALUE, 0, Integer.MAX_VALUE, null);
bin.offer(flowFile, session);
this.readyBins.add(bin);
}
flowFilesBinned += entry.getValue().size();
}
}
return flowFilesBinned;
return new BinningResult(flowFilesBinned, newBinNeeded);
}
@OnScheduled
@ -384,4 +393,24 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
return problems;
}
private static class BinningResult {
private final int flowFilesBinned;
private final boolean newBinNeeded;
public BinningResult(final int flowFilesBinned, final boolean newBinNeeded) {
this.flowFilesBinned = flowFilesBinned;
this.newBinNeeded = newBinNeeded;
}
public int getFlowFilesBinned() {
return flowFilesBinned;
}
public boolean isNewBinNeeded() {
return newBinNeeded;
}
public static BinningResult EMPTY = new BinningResult(0, false);
}
}

View File

@ -172,6 +172,7 @@ public class JoinEnrichment extends BinFiles {
"does not point to any existing field in the original Record, the enrichment will not be inserted.")
.required(true)
.addValidator(new RecordPathValidator())
.defaultValue("/")
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.dependsOn(JOIN_STRATEGY, JOIN_INSERT_ENRICHMENT_FIELDS)
.build();

View File

@ -41,6 +41,7 @@ import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@ -50,10 +51,73 @@ import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestJoinEnrichment {
private static final File EXAMPLES_DIR = new File("src/test/resources/TestJoinEnrichment");
@Test
public void testManyQueued() throws InitializationException {
final TestRunner runner = TestRunners.newTestRunner(new JoinEnrichment());
final ArrayListRecordWriter writer = setupCsvServices(runner);
runner.setProperty(JoinEnrichment.JOIN_STRATEGY, JoinEnrichment.JOIN_SQL);
runner.setProperty(JoinEnrichment.SQL, "SELECT original.i, original.lower_letter, enrichment.upper_letter FROM original JOIN enrichment ON original.i = enrichment.i");
// Enqueue a flowfile where i=0, lower_letter=a; another with i=1, lower_letter=b; etc. up to i=25, lower_letter=z
for (int i=0; i < 26; i++) {
final Map<String, String> originalAttributes = new HashMap<>();
originalAttributes.put("enrichment.group.id", String.valueOf(i));
originalAttributes.put("enrichment.role", "ORIGINAL");
final char letter = (char) ('a' + i);
runner.enqueue("i,lower_letter\n" + i + "," + letter, originalAttributes);
}
// Enqueue a flowfile where i=0, upper_letter=A; another with i=1, upper_letter=B; etc. up to i=25, upper_letter=Z
for (int i=0; i < 26; i++) {
final Map<String, String> enrichmentAttributes = new HashMap<>();
enrichmentAttributes.put("enrichment.group.id", String.valueOf(i));
enrichmentAttributes.put("enrichment.role", "ENRICHMENT");
final char letter = (char) ('A' + i);
runner.enqueue("i,upper_letter\n" + i + "," + letter, enrichmentAttributes);
}
runner.run();
// Ensure that the result is i=0,lower_letter=a,upper_letter=A ... i=25,lower_letter=z,upper_letter=Z
runner.assertTransferCount(JoinEnrichment.REL_JOINED, 26);
runner.assertTransferCount(JoinEnrichment.REL_ORIGINAL, 52);
final List<Record> written = writer.getRecordsWritten();
assertEquals(26, written.size());
final BitSet found = new BitSet();
for (final Record outRecord : written) {
final RecordSchema schema = outRecord.getSchema();
assertEquals(RecordFieldType.STRING, schema.getField("i").get().getDataType().getFieldType());
assertEquals(RecordFieldType.STRING, schema.getField("lower_letter").get().getDataType().getFieldType());
assertEquals(RecordFieldType.STRING, schema.getField("upper_letter").get().getDataType().getFieldType());
final int id = outRecord.getAsInt("i");
final String expectedLower = "" + ((char) ('a' + id));
assertEquals(expectedLower, outRecord.getValue("lower_letter"));
final String expectedUpper = "" + ((char) ('A' + id));
assertEquals(expectedUpper, outRecord.getValue("upper_letter"));
assertEquals(outRecord.getAsString("lower_letter"), outRecord.getAsString("upper_letter").toLowerCase());
found.set(id);
}
for (int i=0; i < 26; i++) {
assertTrue(found.get(i));
}
}
@Test
public void testSimpleSqlJoin() throws InitializationException {