NIFI-4715: ListS3 produces duplicates in frequently updated buckets

Keep totalListCount, reduce unnecessary persistState

This closes #2361.

Signed-off-by: Koji Kawamura <ijokarumawak@apache.org>
This commit is contained in:
Adam Lamar 2017-12-23 20:29:02 -07:00 committed by James Wing
parent 2812fe60a2
commit 0a014dcdb1
1 changed files with 7 additions and 6 deletions

View File

@ -229,7 +229,8 @@ public class ListS3 extends AbstractS3Processor {
final AmazonS3 client = getClient();
int listCount = 0;
long maxTimestamp = 0L;
int totalListCount = 0;
long maxTimestamp = currentTimestamp;
String delimiter = context.getProperty(DELIMITER).getValue();
String prefix = context.getProperty(PREFIX).evaluateAttributeExpressions().getValue();
@ -298,18 +299,19 @@ public class ListS3 extends AbstractS3Processor {
}
bucketLister.setNextMarker();
totalListCount += listCount;
commit(context, session, listCount);
listCount = 0;
} while (bucketLister.isTruncated());
// Update stateManger with the most recent timestamp
currentTimestamp = maxTimestamp;
persistState(context);
final long listMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
getLogger().info("Successfully listed S3 bucket {} in {} millis", new Object[]{bucket, listMillis});
if (!commit(context, session, listCount)) {
if (currentTimestamp > 0) {
persistState(context);
}
if (totalListCount == 0) {
getLogger().debug("No new objects in S3 bucket {} to list. Yielding.", new Object[]{bucket});
context.yield();
}
@ -320,7 +322,6 @@ public class ListS3 extends AbstractS3Processor {
if (willCommit) {
getLogger().info("Successfully listed {} new files from S3; routing to success", new Object[] {listCount});
session.commit();
persistState(context);
}
return willCommit;
}