NIFI-9391: Modified MergeRecord to process FlowFiles within a loop in… (#5550)

* NIFI-9391: Modified MergeRecord to process FlowFiles within a loop in a single onTrigger

MergeRecord processed the FlowFiles in multiple onTrigger-s and it needed an extra onTrigger call
(with no incoming FFs) to realize that no more FFs are available and it is time to send the merged FF downstream.
It was not compatible with Stateless Runtime which does not trigger the flow any more if no FFs available.

Also changed "unschedule" logic in StandardProcessorTestRunner: @OnUnscheduled methods were called immediately after
the 1st FlowFile was processed. Unschedule the processor only at the end of the execution (onTrigger finished)
and only if stopOnFinish has been requested by the test case.
This commit is contained in:
Peter Turcsanyi 2022-01-04 17:36:56 +01:00 committed by GitHub
parent e54cbe6451
commit 4346dd8faf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 64 additions and 60 deletions

View File

@ -230,7 +230,7 @@ public class StandardProcessorTestRunner implements TestRunner {
throw new AssertionError(thrown);
}
if (++finishedCount == 1) {
if (++finishedCount == 1 && stopOnFinish) {
unscheduledRun = true;
unSchedule();
}
@ -238,7 +238,7 @@ public class StandardProcessorTestRunner implements TestRunner {
}
}
if (!unscheduledRun) {
if (!unscheduledRun && stopOnFinish) {
unSchedule();
}

View File

@ -323,56 +323,64 @@ public class MergeRecord extends AbstractSessionFactoryProcessor {
}
}
final ProcessSession session = sessionFactory.createSession();
final List<FlowFile> flowFiles = session.get(FlowFileFilters.newSizeBasedFilter(250, DataUnit.KB, 250));
if (getLogger().isDebugEnabled()) {
final List<String> ids = flowFiles.stream().map(ff -> "id=" + ff.getId()).collect(Collectors.toList());
getLogger().debug("Pulled {} FlowFiles from queue: {}", new Object[] {ids.size(), ids});
}
final String mergeStrategy = context.getProperty(MERGE_STRATEGY).getValue();
final boolean block;
if (MERGE_STRATEGY_DEFRAGMENT.getValue().equals(mergeStrategy)) {
block = true;
} else if (context.getProperty(CORRELATION_ATTRIBUTE_NAME).isSet()) {
block = true;
} else {
block = false;
}
try {
for (final FlowFile flowFile : flowFiles) {
try {
binFlowFile(context, flowFile, session, manager, block);
} catch (final Exception e) {
getLogger().error("Failed to bin {} due to {}", new Object[] {flowFile, e});
session.transfer(flowFile, REL_FAILURE);
}
while (isScheduled()) {
final ProcessSession session = sessionFactory.createSession();
final List<FlowFile> flowFiles = session.get(FlowFileFilters.newSizeBasedFilter(250, DataUnit.KB, 250));
if (flowFiles.isEmpty()) {
break;
}
if (getLogger().isDebugEnabled()) {
final List<String> ids = flowFiles.stream().map(ff -> "id=" + ff.getId()).collect(Collectors.toList());
getLogger().debug("Pulled {} FlowFiles from queue: {}", ids.size(), ids);
}
final String mergeStrategy = context.getProperty(MERGE_STRATEGY).getValue();
final boolean block;
if (MERGE_STRATEGY_DEFRAGMENT.getValue().equals(mergeStrategy)) {
block = true;
} else if (context.getProperty(CORRELATION_ATTRIBUTE_NAME).isSet()) {
block = true;
} else {
block = false;
}
} finally {
session.commitAsync();
}
// If there is no more data queued up, or strategy is defragment, complete any bin that meets our minimum threshold
// Otherwise, run one more cycle to process queued FlowFiles to add more fragment into available bins.
int completedBins = 0;
if (flowFiles.isEmpty() || MERGE_STRATEGY_DEFRAGMENT.getValue().equals(mergeStrategy)) {
try {
completedBins += manager.completeFullEnoughBins();
for (final FlowFile flowFile : flowFiles) {
try {
binFlowFile(context, flowFile, session, manager, block);
} catch (final Exception e) {
getLogger().error("Failed to bin {} due to {}", flowFile, e, e);
session.transfer(flowFile, REL_FAILURE);
}
}
} finally {
session.commitAsync();
}
// Complete any bins that have reached their expiration date
try {
manager.completeExpiredBins();
} catch (final Exception e) {
getLogger().error("Failed to merge FlowFiles to create new bin due to " + e, e);
getLogger().error("Failed to merge FlowFiles to create new bin due to {}", e, e);
}
}
// Complete any bins that have reached their expiration date
try {
completedBins += manager.completeExpiredBins();
} catch (final Exception e) {
getLogger().error("Failed to merge FlowFiles to create new bin due to " + e, e);
}
if (isScheduled()) {
// Complete any bins that have reached their expiration date
try {
manager.completeExpiredBins();
} catch (final Exception e) {
getLogger().error("Failed to merge FlowFiles to create new bin due to {}", e, e);
}
if (completedBins == 0 && flowFiles.isEmpty()) {
getLogger().debug("No FlowFiles to bin; will yield");
// Complete any bins that meet their minimum size requirements
try {
manager.completeFullEnoughBins();
} catch (final Exception e) {
getLogger().error("Failed to merge FlowFiles to create new bin due to {}", e, e);
}
getLogger().debug("No more FlowFiles to bin; will yield");
context.yield();
}
}
@ -386,7 +394,7 @@ public class MergeRecord extends AbstractSessionFactoryProcessor {
final RecordSchema schema = reader.getSchema();
final String groupId = getGroupId(context, flowFile, schema, session);
getLogger().debug("Got Group ID {} for {}", new Object[] {groupId, flowFile});
getLogger().debug("Got Group ID {} for {}", groupId, flowFile);
binManager.add(groupId, flowFile, reader, session, block);
} catch (MalformedRecordException | IOException | SchemaNotFoundException e) {

View File

@ -205,6 +205,11 @@ public class RecordBin {
return false;
}
if (thresholds.getFragmentCountAttribute().isPresent()) {
// Defragment strategy: Compare with the target fragment count.
return this.fragmentCount == thresholds.getFragmentCount();
}
int maxRecords = thresholds.getMaxRecords();
if (recordCount >= maxRecords) {
@ -241,7 +246,7 @@ public class RecordBin {
}
if (thresholds.getFragmentCountAttribute().isPresent()) {
// Compare with the target fragment count.
// Defragment strategy: Compare with the target fragment count.
return this.fragmentCount == thresholds.getFragmentCount();
}

View File

@ -78,13 +78,10 @@ public class TestMergeRecord {
@Test
public void testMergeSimple() {
runner.setProperty(MergeRecord.MIN_RECORDS, "2");
runner.setProperty(MergeRecord.MAX_RECORDS, "2");
runner.enqueue("Name, Age\nJohn, 35");
runner.enqueue("Name, Age\nJane, 34");
runner.run(2);
runner.run(1);
runner.assertTransferCount(MergeRecord.REL_MERGED, 1);
runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 2);
@ -101,12 +98,11 @@ public class TestMergeRecord {
@Test
public void testDifferentSchema() {
runner.setProperty(MergeRecord.MIN_RECORDS, "2");
runner.setProperty(MergeRecord.MAX_RECORDS, "2");
runner.enqueue("Name, Age\nJohn, 35");
runner.enqueue("Name, Color\nJane, Red");
runner.run(2, false, true);
runner.run(1, false, true);
runner.assertTransferCount(MergeRecord.REL_MERGED, 0);
runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 0);
@ -114,7 +110,7 @@ public class TestMergeRecord {
runner.enqueue("Name, Age\nJane, 34");
runner.enqueue("Name, Color\nJohn, Blue");
runner.run(2, true, false);
runner.run(1, true, false);
runner.assertTransferCount(MergeRecord.REL_MERGED, 2);
runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 4);
@ -133,8 +129,7 @@ public class TestMergeRecord {
@Test
public void testFailureToParse() {
runner.setProperty(MergeRecord.MIN_RECORDS, "2");
runner.setProperty(MergeRecord.MAX_RECORDS, "3");
runner.setProperty(MergeRecord.MIN_RECORDS, "3");
readerService.failAfter(2);
@ -329,7 +324,6 @@ public class TestMergeRecord {
@Test
public void testMinSize() {
runner.setProperty(MergeRecord.MIN_RECORDS, "2");
runner.setProperty(MergeRecord.MAX_RECORDS, "2");
runner.setProperty(MergeRecord.MIN_SIZE, "500 B");
runner.enqueue("Name, Age\nJohn, 35");
@ -367,7 +361,6 @@ public class TestMergeRecord {
@Test
public void testMinRecords() {
runner.setProperty(MergeRecord.MIN_RECORDS, "103");
runner.setProperty(MergeRecord.MAX_RECORDS, "110");
runner.setProperty(MergeRecord.MIN_SIZE, "500 B");
runner.enqueue("Name, Age\nJohn, 35");
@ -384,7 +377,7 @@ public class TestMergeRecord {
runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 0);
runner.enqueue("Name, Age\nJohn, 35");
runner.run(2);
runner.run();
runner.assertTransferCount(MergeRecord.REL_MERGED, 1);
runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 4);
}
@ -477,8 +470,6 @@ public class TestMergeRecord {
@Test
public void testDefragmentOldestBinFailsWhenTooManyBins() {
runner.setProperty(MergeRecord.MIN_RECORDS, "5");
runner.setProperty(MergeRecord.MAX_RECORDS, "10");
runner.setProperty(MergeRecord.MAX_BIN_COUNT, "5");
runner.setProperty(MergeRecord.MERGE_STRATEGY, MergeRecord.MERGE_STRATEGY_DEFRAGMENT);