NIFI-4715: Update currentKeys after listing loop

ListS3 used to update currentKeys within listing loop, that causes
    duplicates. Because S3 returns object list in lexicographic order, if we
    clear currentKeys during the loop, we cannot tell if the object has been
    listed or not, in a case where newer object has a lexicographically
    former name.

Signed-off-by: James Wing <jvwing@gmail.com>

This closes #3116, closes #2361.
This commit is contained in:
Koji Kawamura 2018-10-31 16:01:36 +09:00 committed by James Wing
parent 0a014dcdb1
commit 37a0e1b304
1 changed files with 23 additions and 9 deletions

View File

@ -230,7 +230,7 @@ public class ListS3 extends AbstractS3Processor {
final AmazonS3 client = getClient();
int listCount = 0;
int totalListCount = 0;
long maxTimestamp = currentTimestamp;
long latestListedTimestampInThisCycle = currentTimestamp;
String delimiter = context.getProperty(DELIMITER).getValue();
String prefix = context.getProperty(PREFIX).evaluateAttributeExpressions().getValue();
@ -252,6 +252,9 @@ public class ListS3 extends AbstractS3Processor {
}
VersionListing versionListing;
final Set<String> listedKeys = new HashSet<>();
getLogger().trace("Start listing, listingTimestamp={}, currentTimestamp={}, currentKeys={}", new Object[]{listingTimestamp, currentTimestamp, currentKeys});
do {
versionListing = bucketLister.listVersions();
for (S3VersionSummary versionSummary : versionListing.getVersionSummaries()) {
@ -262,6 +265,8 @@ public class ListS3 extends AbstractS3Processor {
continue;
}
getLogger().trace("Listed key={}, lastModified={}, currentKeys={}", new Object[]{versionSummary.getKey(), lastModified, currentKeys});
// Create the attributes
final Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.FILENAME.key(), versionSummary.getKey());
@ -287,14 +292,17 @@ public class ListS3 extends AbstractS3Processor {
flowFile = session.putAllAttributes(flowFile, attributes);
session.transfer(flowFile, REL_SUCCESS);
// Update state
if (lastModified > maxTimestamp) {
maxTimestamp = lastModified;
currentKeys.clear();
}
if (lastModified == maxTimestamp) {
currentKeys.add(versionSummary.getKey());
// Track the latest lastModified timestamp and keys having that timestamp.
// NOTE: Amazon S3 lists objects in UTF-8 character encoding in lexicographical order. Not ordered by timestamps.
if (lastModified > latestListedTimestampInThisCycle) {
latestListedTimestampInThisCycle = lastModified;
listedKeys.clear();
listedKeys.add(versionSummary.getKey());
} else if (lastModified == latestListedTimestampInThisCycle) {
listedKeys.add(versionSummary.getKey());
}
listCount++;
}
bucketLister.setNextMarker();
@ -304,8 +312,14 @@ public class ListS3 extends AbstractS3Processor {
listCount = 0;
} while (bucketLister.isTruncated());
// Update currentKeys.
if (latestListedTimestampInThisCycle > currentTimestamp) {
currentKeys.clear();
}
currentKeys.addAll(listedKeys);
// Update stateManger with the most recent timestamp
currentTimestamp = maxTimestamp;
currentTimestamp = latestListedTimestampInThisCycle;
persistState(context);
final long listMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);