NIFI-9441: Ensure that we only update our member variable for the latest timestamp after processing all objects within the GCS Bucket

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #5567.
This commit is contained in:
Mark Payne 2021-12-03 12:51:16 -05:00 committed by Pierre Villard
parent 41ff6f076b
commit 7a83c32a01
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
1 changed files with 8 additions and 8 deletions

View File

@ -331,7 +331,7 @@ public class ListGCSBucket extends AbstractGCSProcessor {
}
if (writer.isCheckpoint()) {
commit(session, listCount, maxTimestamp, keysMatchingTimestamp);
commit(session, listCount);
listCount = 0;
}
@ -344,7 +344,12 @@ public class ListGCSBucket extends AbstractGCSProcessor {
getLogger().debug("No new objects in GCS bucket {} to list. Yielding.", bucket);
context.yield();
} else {
commit(session, listCount, maxTimestamp, keysMatchingTimestamp);
commit(session, listCount);
currentTimestamp = maxTimestamp;
currentKeys.clear();
currentKeys.addAll(keysMatchingTimestamp);
persistState(session, currentTimestamp, currentKeys);
}
} catch (final Exception e) {
getLogger().error("Failed to list contents of GCS Bucket due to {}", new Object[] {e}, e);
@ -358,13 +363,8 @@ public class ListGCSBucket extends AbstractGCSProcessor {
getLogger().info("Successfully listed GCS bucket {} in {} millis", new Object[]{bucket, listMillis});
}
private void commit(final ProcessSession session, final int listCount, final long timestamp, final Set<String> keysMatchingTimestamp) {
private void commit(final ProcessSession session, final int listCount) {
if (listCount > 0) {
currentTimestamp = timestamp;
currentKeys.clear();
currentKeys.addAll(keysMatchingTimestamp);
persistState(session, currentTimestamp, currentKeys);
getLogger().info("Successfully listed {} new files from GCS; routing to success", new Object[] {listCount});
session.commitAsync();
}