NIFI-3332: ListXXX to not miss files with the latest processed timestamp

Before this fix, it's possible that ListXXX processors can miss files those have the same timestamp as the one which was the latest processed timestamp at the previous cycle. Since it only used timestamps, it was not possible to determine whether a file is already processed or not.

However, storing every single processed identifier as we used to will not perform well.
Instead, this commit makes ListXXX to store only identifiers those have the latest timestamp at a cycle to minimize the amount of state data to store.

NIFI-3332: ListXXX to not miss files with the latest processed timestamp

- Fixed TestAbstractListProcessor to use appropriate time precision.
  Without this fix, arbitrary test can fail if generated timestamp does
  not have the desired time unit value, e.g. generated '10:51:00' where
  second precision is tested.
- Fixed TestFTP.basicFileList to use millisecond time precision explicitly
  because FakeFtpServer's time precision is in minutes.
- Changed junit dependency scope to 'provided' as it is needed by
  ListProcessorTestWatcher which is shared among different modules.

This closes #1975.

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Koji Kawamura 2017-07-04 17:34:31 +09:00 committed by Bryan Bende
parent 28ee70222b
commit e68ff153e8
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
5 changed files with 137 additions and 26 deletions

View File

@ -68,9 +68,10 @@
<scope>test</scope>
</dependency>
<dependency>
<!-- Dependency marked as provided, not test, because ListProcessorTestWatcher uses TestWatcher -->
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>compile</scope>
<scope>provided</scope>
</dependency>
</dependencies>
</project>

View File

@ -32,6 +32,7 @@ import java.util.Properties;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.behavior.TriggerSerially;
@ -178,6 +179,7 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
private volatile Long lastRunTimeNanos = 0L;
private volatile boolean justElectedPrimaryNode = false;
private volatile boolean resetState = false;
private volatile List<String> latestIdentifiersProcessed = new ArrayList<>();
/*
* A constant used in determining an internal "yield" of processing files. Given the logic to provide a pause on the newest
@ -194,6 +196,7 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
}
static final String LATEST_LISTED_ENTRY_TIMESTAMP_KEY = "listing.timestamp";
static final String LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY = "processed.timestamp";
static final String IDENTIFIER_PREFIX = "id";
public File getPersistenceFile() {
return new File("conf/state/" + getIdentifier());
@ -307,6 +310,8 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
// if the local file's latest timestamp is beyond that of the value provided from the cache, replace
if (minTimestamp == null || localTimestamp > minTimestamp) {
minTimestamp = localTimestamp;
latestIdentifiersProcessed.clear();
latestIdentifiersProcessed.addAll(listing.getMatchingIdentifiers());
}
}
@ -317,16 +322,20 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
}
if (minTimestamp != null) {
persist(minTimestamp, minTimestamp, stateManager, scope);
persist(minTimestamp, minTimestamp, latestIdentifiersProcessed, stateManager, scope);
}
}
private void persist(final long latestListedEntryTimestampThisCycleMillis,
final long lastProcessedLatestEntryTimestampMillis,
final List<String> processedIdentifiesWithLatestTimestamp,
final StateManager stateManager, final Scope scope) throws IOException {
final Map<String, String> updatedState = new HashMap<>(1);
final Map<String, String> updatedState = new HashMap<>(processedIdentifiesWithLatestTimestamp.size() + 2);
updatedState.put(LATEST_LISTED_ENTRY_TIMESTAMP_KEY, String.valueOf(latestListedEntryTimestampThisCycleMillis));
updatedState.put(LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY, String.valueOf(lastProcessedLatestEntryTimestampMillis));
for (int i = 0; i < processedIdentifiesWithLatestTimestamp.size(); i++) {
updatedState.put(IDENTIFIER_PREFIX + "." + i, processedIdentifiesWithLatestTimestamp.get(i));
}
stateManager.setState(updatedState, scope);
}
@ -350,20 +359,28 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
// Attempt to retrieve state from the state manager if a last listing was not yet established or
// if just elected the primary node
final StateMap stateMap = context.getStateManager().getState(getStateScope(context));
final String latestListedEntryTimestampString = stateMap.get(LATEST_LISTED_ENTRY_TIMESTAMP_KEY);
final String lastProcessedLatestEntryTimestampString= stateMap.get(LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY);
if (lastProcessedLatestEntryTimestampString != null) {
this.lastProcessedLatestEntryTimestampMillis = Long.parseLong(lastProcessedLatestEntryTimestampString);
latestIdentifiersProcessed.clear();
for (Map.Entry<String, String> state : stateMap.toMap().entrySet()) {
final String k = state.getKey();
final String v = state.getValue();
if (v == null || v.isEmpty()) {
continue;
}
if (latestListedEntryTimestampString != null) {
minTimestampToListMillis = Long.parseLong(latestListedEntryTimestampString);
if (LATEST_LISTED_ENTRY_TIMESTAMP_KEY.equals(k)) {
minTimestampToListMillis = Long.parseLong(v);
// If our determined timestamp is the same as that of our last listing, skip this execution as there are no updates
if (minTimestampToListMillis == this.lastListedLatestEntryTimestampMillis) {
if (minTimestampToListMillis.equals(this.lastListedLatestEntryTimestampMillis)) {
context.yield();
return;
} else {
this.lastListedLatestEntryTimestampMillis = minTimestampToListMillis;
}
} else if (LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY.equals(k)) {
this.lastProcessedLatestEntryTimestampMillis = Long.parseLong(v);
} else if (k.startsWith(IDENTIFIER_PREFIX)) {
latestIdentifiersProcessed.add(v);
}
}
justElectedPrimaryNode = false;
} catch (final IOException ioe) {
@ -405,7 +422,7 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
targetSystemHasSeconds = entityTimestampMillis % 60_000 > 0;
}
// New entries are all those that occur at or after the associated timestamp
final boolean newEntry = minTimestampToListMillis == null || entityTimestampMillis >= minTimestampToListMillis && entityTimestampMillis > lastProcessedLatestEntryTimestampMillis;
final boolean newEntry = minTimestampToListMillis == null || entityTimestampMillis >= minTimestampToListMillis && entityTimestampMillis >= lastProcessedLatestEntryTimestampMillis;
if (newEntry) {
List<T> entitiesForTimestamp = orderedEntries.get(entity.getTimestamp());
@ -439,7 +456,10 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
* - The latest listed entity timestamp is equal to the last processed time, meaning we handled those items originally passed over. No need to process it again.
*/
final long listingLagNanos = TimeUnit.MILLISECONDS.toNanos(listingLagMillis);
if (currentRunTimeNanos - lastRunTimeNanos < listingLagNanos || latestListedEntryTimestampThisCycleMillis.equals(lastProcessedLatestEntryTimestampMillis)) {
if (currentRunTimeNanos - lastRunTimeNanos < listingLagNanos
|| (latestListedEntryTimestampThisCycleMillis.equals(lastProcessedLatestEntryTimestampMillis)
&& orderedEntries.get(latestListedEntryTimestampThisCycleMillis).stream()
.allMatch(entity -> latestIdentifiersProcessed.contains(entity.getIdentifier())))) {
context.yield();
return;
}
@ -456,9 +476,14 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
}
}
for (Map.Entry<Long, List<T>> timestampEntities : orderedEntries.entrySet()) {
List<T> entities = timestampEntities.getValue();
if (timestampEntities.getKey().equals(lastProcessedLatestEntryTimestampMillis)) {
// Filter out previously processed entities.
entities = entities.stream().filter(entity -> !latestIdentifiersProcessed.contains(entity.getIdentifier())).collect(Collectors.toList());
}
for (List<T> timestampEntities : orderedEntries.values()) {
for (T entity : timestampEntities) {
for (T entity : entities) {
// Create the FlowFile for this path.
final Map<String, String> attributes = createAttributes(entity, context);
FlowFile flowFile = session.create();
@ -476,6 +501,13 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
// If there have been files created, update the last timestamp we processed.
// Retrieving lastKey instead of using latestListedEntryTimestampThisCycleMillis is intentional here,
// because latestListedEntryTimestampThisCycleMillis might be removed if it's not old enough.
if (!orderedEntries.lastKey().equals(lastProcessedLatestEntryTimestampMillis)) {
// If the latest timestamp at this cycle becomes different than the previous one, we need to clear identifiers.
// If it didn't change, we need to add identifiers.
latestIdentifiersProcessed.clear();
}
// Capture latestIdentifierProcessed.
latestIdentifiersProcessed.addAll(orderedEntries.lastEntry().getValue().stream().map(T::getIdentifier).collect(Collectors.toList()));
lastProcessedLatestEntryTimestampMillis = orderedEntries.lastKey();
getLogger().info("Successfully created listing with {} new objects", new Object[]{flowfilesCreated});
session.commit();
@ -494,7 +526,7 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
// the distributed state cache, the node can continue to run (if it is primary node).
try {
lastListedLatestEntryTimestampMillis = latestListedEntryTimestampThisCycleMillis;
persist(latestListedEntryTimestampThisCycleMillis, lastProcessedLatestEntryTimestampMillis, context.getStateManager(), getStateScope(context));
persist(latestListedEntryTimestampThisCycleMillis, lastProcessedLatestEntryTimestampMillis, latestIdentifiersProcessed, context.getStateManager(), getStateScope(context));
} catch (final IOException ioe) {
getLogger().warn("Unable to save state due to {}. If NiFi is restarted before state is saved, or "
+ "if another node begins executing this Processor, data duplication may occur.", ioe);
@ -518,6 +550,7 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
lastListedLatestEntryTimestampMillis = null;
lastProcessedLatestEntryTimestampMillis = 0L;
lastRunTimeNanos = 0L;
latestIdentifiersProcessed.clear();
}
/**

View File

@ -17,6 +17,10 @@
package org.apache.nifi.processor.util.list;
import static org.apache.nifi.processor.util.list.AbstractListProcessor.PRECISION_MILLIS;
import static org.apache.nifi.processor.util.list.AbstractListProcessor.PRECISION_MINUTES;
import static org.apache.nifi.processor.util.list.AbstractListProcessor.PRECISION_SECONDS;
import static org.apache.nifi.processor.util.list.AbstractListProcessor.TARGET_SYSTEM_TIMESTAMP_PRECISION;
import static org.junit.Assert.assertEquals;
import java.io.File;
@ -130,6 +134,8 @@ public class TestAbstractListProcessor {
final long initialTimestamp = getCurrentTimestampMillis(targetPrecision);
setTargetSystemTimestampPrecision(targetPrecision);
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
proc.addEntity("name", "id", initialTimestamp);
proc.addEntity("name", "id2", initialTimestamp);
@ -182,6 +188,8 @@ public class TestAbstractListProcessor {
final long initialTimestamp = getCurrentTimestampMillis(targetPrecision);
setTargetSystemTimestampPrecision(targetPrecision);
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
proc.addEntity("name", "id", initialTimestamp);
proc.addEntity("name", "id2", initialTimestamp);
@ -226,6 +234,20 @@ public class TestAbstractListProcessor {
runner.clearTransferState();
}
private void setTargetSystemTimestampPrecision(TimeUnit targetPrecision) {
switch (targetPrecision) {
case MINUTES:
runner.setProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION, PRECISION_MINUTES);
break;
case SECONDS:
runner.setProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION, PRECISION_SECONDS);
break;
case MILLISECONDS:
runner.setProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION, PRECISION_MILLIS);
break;
}
}
@Test
public void testOnlyNewEntriesEmittedMillisPrecision() throws Exception {
testOnlyNewEntriesEmitted(TimeUnit.MILLISECONDS);
@ -257,6 +279,8 @@ public class TestAbstractListProcessor {
final Map<String, String> preexistingState = new HashMap<>();
preexistingState.put(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY, Long.toString(initialTimestamp));
preexistingState.put(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY, Long.toString(initialTimestamp));
preexistingState.put(AbstractListProcessor.IDENTIFIER_PREFIX + ".0", "id");
preexistingState.put(AbstractListProcessor.IDENTIFIER_PREFIX + ".1", "id2");
runner.getStateManager().setState(preexistingState, Scope.CLUSTER);
// run for the first time
@ -324,6 +348,7 @@ public class TestAbstractListProcessor {
// Ensure only timestamp is migrated
expectedState.put(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY, String.valueOf(initialTimestamp));
expectedState.put(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY, String.valueOf(initialTimestamp));
expectedState.put(AbstractListProcessor.IDENTIFIER_PREFIX + ".0", "id");
runner.getStateManager().assertStateEquals(expectedState, Scope.CLUSTER);
}
@ -383,9 +408,10 @@ public class TestAbstractListProcessor {
// Verify the state manager now maintains the associated state
final Map<String, String> expectedState = new HashMap<>();
// Ensure only timestamp is migrated
// Ensure timestamp and identifies are migrated
expectedState.put(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY, "1492");
expectedState.put(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY, "1492");
expectedState.put(AbstractListProcessor.IDENTIFIER_PREFIX + ".0", "id");
runner.getStateManager().assertStateEquals(expectedState, Scope.CLUSTER);
}
@ -462,10 +488,12 @@ public class TestAbstractListProcessor {
assertEquals(2, stateMap.getVersion());
final Map<String, String> map = stateMap.toMap();
// Ensure only timestamp is migrated
assertEquals(2, map.size());
// Ensure timestamp and identifiers are migrated
assertEquals(4, map.size());
assertEquals(Long.toString(initialTimestamp), map.get(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY));
assertEquals(Long.toString(initialTimestamp), map.get(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY));
assertEquals("id", map.get(AbstractListProcessor.IDENTIFIER_PREFIX + ".0"));
assertEquals("id2", map.get(AbstractListProcessor.IDENTIFIER_PREFIX + ".1"));
proc.addEntity("new name", "new id", initialTimestamp + 1);
runner.run();
@ -476,10 +504,11 @@ public class TestAbstractListProcessor {
StateMap updatedStateMap = runner.getStateManager().getState(Scope.CLUSTER);
assertEquals(3, updatedStateMap.getVersion());
assertEquals(2, updatedStateMap.toMap().size());
assertEquals(3, updatedStateMap.toMap().size());
assertEquals(Long.toString(initialTimestamp + 1), updatedStateMap.get(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY));
// Processed timestamp is now caught up
assertEquals(Long.toString(initialTimestamp + 1), updatedStateMap.get(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY));
assertEquals("new id", updatedStateMap.get(AbstractListProcessor.IDENTIFIER_PREFIX + ".0"));
}
private static class DistributedCache extends AbstractControllerService implements DistributedMapCacheClient {

View File

@ -19,6 +19,7 @@ package org.apache.nifi.processors.standard;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.list.AbstractListProcessor;
import org.apache.nifi.processors.standard.util.FTPTransfer;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.MockProcessContext;
@ -41,6 +42,7 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
@ -201,7 +203,7 @@ public class TestFTP {
}
@Test
public void basicFileList() throws IOException {
public void basicFileList() throws IOException, InterruptedException {
FileSystem results = fakeFtpServer.getFileSystem();
FileEntry sampleFile = new FileEntry("c:\\data\\randombytes-2");
@ -217,10 +219,16 @@ public class TestFTP {
runner.setProperty(FTPTransfer.PASSWORD, password);
runner.setProperty(FTPTransfer.PORT, Integer.toString(ftpPort));
runner.setProperty(ListFTP.REMOTE_PATH, "/");
// FakeFTPServer has timestamp precision in minutes.
// Specify milliseconds precision so that test does not need to wait for minutes.
runner.setProperty(ListFile.TARGET_SYSTEM_TIMESTAMP_PRECISION, ListFile.PRECISION_MILLIS);
runner.assertValid();
// Ensure wait for enough lag time.
Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS.get(TimeUnit.MILLISECONDS) * 2);
runner.run();
runner.assertTransferCount(FetchFTP.REL_SUCCESS, 1);
final MockFlowFile retrievedFile = runner.getFlowFilesForRelationship(FetchFTP.REL_SUCCESS).get(0);
runner.assertAllFlowFilesContainAttribute("ftp.remote.host");
runner.assertAllFlowFilesContainAttribute("ftp.remote.port");

View File

@ -33,8 +33,10 @@ import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@ -646,6 +648,44 @@ public class TestListFile {
assertEquals(false, processor.isListingResetNecessary(new PropertyDescriptor.Builder().name("x").build()));
}
private void makeTestFile(final String name, final long millis, final Map<String, Long> fileTimes) throws IOException {
final File file = new File(TESTDIR + name);
assertTrue(file.createNewFile());
assertTrue(file.setLastModified(millis));
fileTimes.put(file.getName(), file.lastModified());
}
@Test
public void testFilterRunMidFileWrites() throws Exception {
final Map<String, Long> fileTimes = new HashMap<>();
runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
makeTestFile("/batch1-age3.txt", time3millis, fileTimes);
makeTestFile("/batch1-age4.txt", time4millis, fileTimes);
makeTestFile("/batch1-age5.txt", time5millis, fileTimes);
// check files
runNext();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
runner.assertTransferCount(ListFile.REL_SUCCESS, 3);
assertEquals(3, runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS).size());
// should be picked since it's newer than age3
makeTestFile("/batch2-age2.txt", time2millis, fileTimes);
// should be picked even if it has the same age3 timestamp, because it wasn't there at the previous cycle.
makeTestFile("/batch2-age3.txt", time3millis, fileTimes);
// should be ignored since it's older than age3
makeTestFile("/batch2-age4.txt", time4millis, fileTimes);
runNext();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
runner.assertTransferCount(ListFile.REL_SUCCESS, 2);
assertEquals(2, runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS).size());
}
/*
* HFS+, default for OS X, only has granularity to one second, accordingly, we go back in time to establish consistent test cases
*