From 37a0e1b3048b5db067b6485bb437887cb0869888 Mon Sep 17 00:00:00 2001 From: Koji Kawamura Date: Wed, 31 Oct 2018 16:01:36 +0900 Subject: [PATCH] NIFI-4715: Update currentKeys after listing loop ListS3 used to update currentKeys within listing loop, that causes duplicates. Because S3 returns object list in lexicographic order, if we clear currentKeys during the loop, we cannot tell if the object has been listed or not, in a case where newer object has a lexicographically former name. Signed-off-by: James Wing This closes #3116, closes #2361. --- .../apache/nifi/processors/aws/s3/ListS3.java | 32 +++++++++++++------ 1 file changed, 23 insertions(+), 9 deletions(-) diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java index fc3260c8ef..d3bade9faa 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java @@ -230,7 +230,7 @@ public class ListS3 extends AbstractS3Processor { final AmazonS3 client = getClient(); int listCount = 0; int totalListCount = 0; - long maxTimestamp = currentTimestamp; + long latestListedTimestampInThisCycle = currentTimestamp; String delimiter = context.getProperty(DELIMITER).getValue(); String prefix = context.getProperty(PREFIX).evaluateAttributeExpressions().getValue(); @@ -252,6 +252,9 @@ public class ListS3 extends AbstractS3Processor { } VersionListing versionListing; + final Set listedKeys = new HashSet<>(); + getLogger().trace("Start listing, listingTimestamp={}, currentTimestamp={}, currentKeys={}", new Object[]{listingTimestamp, currentTimestamp, currentKeys}); + do { versionListing = bucketLister.listVersions(); for (S3VersionSummary versionSummary : versionListing.getVersionSummaries()) { @@ -262,6 +265,8 @@ public class ListS3 extends AbstractS3Processor { continue; } + getLogger().trace("Listed key={}, lastModified={}, currentKeys={}", new Object[]{versionSummary.getKey(), lastModified, currentKeys}); + // Create the attributes final Map attributes = new HashMap<>(); attributes.put(CoreAttributes.FILENAME.key(), versionSummary.getKey()); @@ -287,14 +292,17 @@ public class ListS3 extends AbstractS3Processor { flowFile = session.putAllAttributes(flowFile, attributes); session.transfer(flowFile, REL_SUCCESS); - // Update state - if (lastModified > maxTimestamp) { - maxTimestamp = lastModified; - currentKeys.clear(); - } - if (lastModified == maxTimestamp) { - currentKeys.add(versionSummary.getKey()); + // Track the latest lastModified timestamp and keys having that timestamp. + // NOTE: Amazon S3 lists objects in UTF-8 character encoding in lexicographical order. Not ordered by timestamps. + if (lastModified > latestListedTimestampInThisCycle) { + latestListedTimestampInThisCycle = lastModified; + listedKeys.clear(); + listedKeys.add(versionSummary.getKey()); + + } else if (lastModified == latestListedTimestampInThisCycle) { + listedKeys.add(versionSummary.getKey()); } + listCount++; } bucketLister.setNextMarker(); @@ -304,8 +312,14 @@ public class ListS3 extends AbstractS3Processor { listCount = 0; } while (bucketLister.isTruncated()); + // Update currentKeys. + if (latestListedTimestampInThisCycle > currentTimestamp) { + currentKeys.clear(); + } + currentKeys.addAll(listedKeys); + // Update stateManger with the most recent timestamp - currentTimestamp = maxTimestamp; + currentTimestamp = latestListedTimestampInThisCycle; persistState(context); final long listMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);