From 0a014dcdb13e30084e6378c14f8c8e5568493c33 Mon Sep 17 00:00:00 2001 From: Adam Lamar Date: Sat, 23 Dec 2017 20:29:02 -0700 Subject: [PATCH] NIFI-4715: ListS3 produces duplicates in frequently updated buckets Keep totalListCount, reduce unnecessary persistState This closes #2361. Signed-off-by: Koji Kawamura --- .../org/apache/nifi/processors/aws/s3/ListS3.java | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java index f5a69acb59..fc3260c8ef 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java @@ -229,7 +229,8 @@ public class ListS3 extends AbstractS3Processor { final AmazonS3 client = getClient(); int listCount = 0; - long maxTimestamp = 0L; + int totalListCount = 0; + long maxTimestamp = currentTimestamp; String delimiter = context.getProperty(DELIMITER).getValue(); String prefix = context.getProperty(PREFIX).evaluateAttributeExpressions().getValue(); @@ -298,18 +299,19 @@ public class ListS3 extends AbstractS3Processor { } bucketLister.setNextMarker(); + totalListCount += listCount; commit(context, session, listCount); listCount = 0; } while (bucketLister.isTruncated()); + + // Update stateManger with the most recent timestamp currentTimestamp = maxTimestamp; + persistState(context); final long listMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); getLogger().info("Successfully listed S3 bucket {} in {} millis", new Object[]{bucket, listMillis}); - if (!commit(context, session, listCount)) { - if (currentTimestamp > 0) { - persistState(context); - } + if (totalListCount == 0) { getLogger().debug("No new objects in S3 bucket {} to list. Yielding.", new Object[]{bucket}); context.yield(); } @@ -320,7 +322,6 @@ public class ListS3 extends AbstractS3Processor { if (willCommit) { getLogger().info("Successfully listed {} new files from S3; routing to success", new Object[] {listCount}); session.commit(); - persistState(context); } return willCommit; }