NIFI-9362: Ensure that we update the StateMap in AbstractListProcessor to hold any files whose date matches the latest before setting cluster-wide state

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #5507
This commit is contained in:
Mark Payne 2021-11-03 15:37:54 -04:00 committed by Matthew Burgess
parent a9685b36fc
commit 705c65c86c
No known key found for this signature in database
GPG Key ID: 05D3DEB8126DAD24
2 changed files with 55 additions and 11 deletions

View File

@ -851,6 +851,20 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
if (latestListedEntryTimestampThisCycleMillis != null) {
final boolean processedNewFiles = entitiesListed > 0;
if (processedNewFiles) {
// If there have been files created, update the last timestamp we processed.
// Retrieving lastKey instead of using latestListedEntryTimestampThisCycleMillis is intentional here,
// because latestListedEntryTimestampThisCycleMillis might be removed if it's not old enough.
if (!orderedEntries.lastKey().equals(lastProcessedLatestEntryTimestampMillis)) {
// If the latest timestamp at this cycle becomes different than the previous one, we need to clear identifiers.
// If it didn't change, we need to add identifiers.
latestIdentifiersProcessed.clear();
}
// Capture latestIdentifierProcessed.
latestIdentifiersProcessed.addAll(orderedEntries.lastEntry().getValue().stream().map(T::getIdentifier).collect(Collectors.toList()));
lastProcessedLatestEntryTimestampMillis = orderedEntries.lastKey();
}
if (!latestListedEntryTimestampThisCycleMillis.equals(lastListedLatestEntryTimestampMillis) || processedNewFiles) {
// We have performed a listing and pushed any FlowFiles out that may have been generated
// Now, we need to persist state about the Last Modified timestamp of the newest file
@ -870,17 +884,6 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
}
if (processedNewFiles) {
// If there have been files created, update the last timestamp we processed.
// Retrieving lastKey instead of using latestListedEntryTimestampThisCycleMillis is intentional here,
// because latestListedEntryTimestampThisCycleMillis might be removed if it's not old enough.
if (!orderedEntries.lastKey().equals(lastProcessedLatestEntryTimestampMillis)) {
// If the latest timestamp at this cycle becomes different than the previous one, we need to clear identifiers.
// If it didn't change, we need to add identifiers.
latestIdentifiersProcessed.clear();
}
// Capture latestIdentifierProcessed.
latestIdentifiersProcessed.addAll(orderedEntries.lastEntry().getValue().stream().map(T::getIdentifier).collect(Collectors.toList()));
lastProcessedLatestEntryTimestampMillis = orderedEntries.lastKey();
getLogger().info("Successfully created listing with {} new objects", new Object[]{entitiesListed});
session.commitAsync();
}

View File

@ -24,6 +24,7 @@ import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.Validator;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.distributed.cache.client.Deserializer;
@ -115,6 +116,46 @@ public class TestAbstractListProcessor {
@Rule
public final TemporaryFolder testFolder = new TemporaryFolder();
@Test
public void testStateMigratedWhenPrimaryNodeSwitch() throws IOException {
// add a few entities
for (int i=0; i < 5; i++) {
proc.addEntity(String.valueOf(i), String.valueOf(i), 88888L);
}
// Add an entity with a later timestamp
proc.addEntity("10", "10", 99999999L);
// Run the processor. All 6 should be listed.
runner.run();
runner.assertAllFlowFilesTransferred(AbstractListProcessor.REL_SUCCESS, 6);
// Now, we want to mimic Primary Node changing. To do so, we'll capture the Cluster State from the State Manager,
// create a new Processor, and set the state to be the same, and update the processor in order to produce the same listing.
final ConcreteListProcessor secondProc = new ConcreteListProcessor();
// Add same listing to the new processor
for (int i=0; i < 5; i++) {
secondProc.addEntity(String.valueOf(i), String.valueOf(i), 88888L);
}
secondProc.addEntity("10", "10", 99999999L);
// Create new runner for the second processor and update its state to match that of the last TestRunner.
final StateMap stateMap = runner.getStateManager().getState(Scope.CLUSTER);
runner = TestRunners.newTestRunner(secondProc);
runner.getStateManager().setState(stateMap.toMap(), Scope.CLUSTER);
// Run several times, ensuring that nothing is emitted.
for (int i=0; i < 10; i++) {
runner.run();
runner.assertAllFlowFilesTransferred(AbstractListProcessor.REL_SUCCESS, 0);
}
// Add one more entry and ensure that it's emitted.
secondProc.addEntity("new", "new", 999999990L);
runner.run();
runner.assertAllFlowFilesTransferred(AbstractListProcessor.REL_SUCCESS, 1);
}
@Test
public void testStateMigratedFromCacheService() throws InitializationException {