NIFI-12594: ListS3 - observe min/max object age when entity state tracking is used

This closes #8231.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
p-kimberley 2024-01-11 00:59:59 +11:00 committed by Peter Turcsanyi
parent f1cac06f2a
commit 3ebad40fae
No known key found for this signature in database
GPG Key ID: 55A813F1C3E553DC
2 changed files with 86 additions and 99 deletions

View File

@ -161,11 +161,12 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
public static final PropertyDescriptor INITIAL_LISTING_TARGET = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(ListedEntityTracker.INITIAL_LISTING_TARGET)
.dependsOn(LISTING_STRATEGY, BY_ENTITIES)
.required(true)
.build();
public static final PropertyDescriptor TRACKING_TIME_WINDOW = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(ListedEntityTracker.TRACKING_TIME_WINDOW)
.dependsOn(INITIAL_LISTING_TARGET, ListedEntityTracker.INITIAL_LISTING_TARGET_WINDOW)
.dependsOn(ListedEntityTracker.TRACKING_STATE_CACHE)
.required(true)
.build();
@ -289,8 +290,8 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
AWS_CREDENTIALS_PROVIDER_SERVICE,
LISTING_STRATEGY,
TRACKING_STATE_CACHE,
INITIAL_LISTING_TARGET,
TRACKING_TIME_WINDOW,
INITIAL_LISTING_TARGET,
RECORD_WRITER,
MIN_AGE,
MAX_AGE,
@ -321,6 +322,8 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
private volatile boolean justElectedPrimaryNode = false;
private volatile boolean resetEntityTrackingState = false;
private volatile ListedEntityTracker<ListableEntityWrapper<S3VersionSummary>> listedEntityTracker;
private volatile Long minObjectAgeMilliseconds;
private volatile Long maxObjectAgeMilliseconds;
@OnPrimaryNodeStateChange
public void onPrimaryNodeChange(final PrimaryNodeState newState) {
@ -346,6 +349,9 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
} else {
listedEntityTracker = null;
}
minObjectAgeMilliseconds = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
maxObjectAgeMilliseconds = context.getProperty(MAX_AGE) != null ? context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS) : null;
}
protected ListedEntityTracker<ListableEntityWrapper<S3VersionSummary>> createListedEntityTracker() {
@ -356,7 +362,7 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
return new Validator() {
@Override
public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
boolean requesterPays = Boolean.valueOf(input);
boolean requesterPays = Boolean.parseBoolean(input);
boolean useVersions = context.getProperty(USE_VERSIONS).asBoolean();
boolean valid = !requesterPays || !useVersions;
return new ValidationResult.Builder()
@ -407,7 +413,7 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
private void restoreState(final ProcessSession session) throws IOException {
final StateMap stateMap = session.getState(Scope.CLUSTER);
if (!stateMap.getStateVersion().isPresent() || stateMap.get(CURRENT_TIMESTAMP) == null || stateMap.get(CURRENT_KEY_PREFIX+"0") == null) {
if (stateMap.getStateVersion().isEmpty() || stateMap.get(CURRENT_TIMESTAMP) == null || stateMap.get(CURRENT_KEY_PREFIX + "0") == null) {
forcefullyUpdateListing(0L, Collections.emptySet());
} else {
final long timestamp = Long.parseLong(stateMap.get(CURRENT_TIMESTAMP));
@ -466,26 +472,21 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
}
final AmazonS3 client = getClient(context);
S3BucketLister bucketLister = getS3BucketLister(context, client);
final long startNanos = System.nanoTime();
final long minAgeMilliseconds = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
final Long maxAgeMilliseconds = context.getProperty(MAX_AGE) != null ? context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS) : null;
final long listingTimestamp = System.currentTimeMillis();
final S3BucketLister bucketLister = getS3BucketLister(context, client);
final String bucket = context.getProperty(BUCKET_WITHOUT_DEFAULT_VALUE).evaluateAttributeExpressions().getValue();
final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
final ListingSnapshot currentListing = listing.get();
final long currentTimestamp = currentListing.getTimestamp();
final long startNanos = System.nanoTime();
final long currentTimestamp = System.currentTimeMillis();
final long listingTimestamp = currentListing.getTimestamp();
final Set<String> currentKeys = currentListing.getKeys();
int listCount = 0;
int totalListCount = 0;
long latestListedTimestampInThisCycle = currentTimestamp;
long latestListedTimestampInThisCycle = listingTimestamp;
final Set<String> listedKeys = new HashSet<>();
getLogger().trace("Start listing, listingTimestamp={}, currentTimestamp={}, currentKeys={}", new Object[]{listingTimestamp, currentTimestamp, currentKeys});
getLogger().trace("Start listing, listingTimestamp={}, currentTimestamp={}, currentKeys={}", new Object[]{currentTimestamp, listingTimestamp, currentKeys});
final S3ObjectWriter writer;
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
@ -499,23 +500,20 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
writer.beginListing();
do {
VersionListing versionListing = bucketLister.listVersions();
final VersionListing versionListing = bucketLister.listVersions();
for (S3VersionSummary versionSummary : versionListing.getVersionSummaries()) {
long lastModified = versionSummary.getLastModified().getTime();
if (lastModified < currentTimestamp
|| lastModified == currentTimestamp && currentKeys.contains(versionSummary.getKey())
|| (maxAgeMilliseconds != null && (lastModified < (listingTimestamp - maxAgeMilliseconds)))
|| lastModified > (listingTimestamp - minAgeMilliseconds)) {
final long lastModified = versionSummary.getLastModified().getTime();
if (lastModified < listingTimestamp
|| lastModified == listingTimestamp && currentKeys.contains(versionSummary.getKey())
|| !includeObjectInListing(versionSummary, currentTimestamp)) {
continue;
}
getLogger().trace("Listed key={}, lastModified={}, currentKeys={}", new Object[]{versionSummary.getKey(), lastModified, currentKeys});
GetObjectTaggingResult taggingResult = getTaggingResult(context, client, versionSummary);
ObjectMetadata objectMetadata = getObjectMetadata(context, client, versionSummary);
// Write the entity to the listing
final GetObjectTaggingResult taggingResult = getTaggingResult(context, client, versionSummary);
final ObjectMetadata objectMetadata = getObjectMetadata(context, client, versionSummary);
writer.addToListing(versionSummary, taggingResult, objectMetadata, context.getProperty(S3_REGION).getValue());
// Track the latest lastModified timestamp and keys having that timestamp.
@ -531,8 +529,8 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
listCount++;
}
bucketLister.setNextMarker();
bucketLister.setNextMarker();
totalListCount += listCount;
if (listCount >= batchSize && writer.isCheckpoint()) {
@ -553,7 +551,7 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
}
final Set<String> updatedKeys = new HashSet<>();
if (latestListedTimestampInThisCycle <= currentTimestamp) {
if (latestListedTimestampInThisCycle <= listingTimestamp) {
updatedKeys.addAll(currentKeys);
}
updatedKeys.addAll(listedKeys);
@ -577,10 +575,12 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
private void listByTrackingEntities(ProcessContext context, ProcessSession session) {
listedEntityTracker.trackEntities(context, session, justElectedPrimaryNode, Scope.CLUSTER, minTimestampToList -> {
S3BucketLister bucketLister = getS3BucketLister(context, getClient(context));
final long currentTime = System.currentTimeMillis();
List<ListableEntityWrapper<S3VersionSummary>> listedEntities = bucketLister.listVersions().getVersionSummaries()
return bucketLister.listVersions().getVersionSummaries()
.stream()
.filter(s3VersionSummary -> s3VersionSummary.getLastModified().getTime() >= minTimestampToList)
.filter(s3VersionSummary -> s3VersionSummary.getLastModified().getTime() >= minTimestampToList
&& includeObjectInListing(s3VersionSummary, currentTime))
.map(s3VersionSummary -> new ListableEntityWrapper<S3VersionSummary>(
s3VersionSummary,
S3VersionSummary::getKey,
@ -589,8 +589,6 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
S3VersionSummary::getSize
))
.collect(Collectors.toList());
return listedEntities;
}, null);
justElectedPrimaryNode = false;
@ -637,10 +635,9 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
for (ListableEntityWrapper<S3VersionSummary> updatedEntity : updatedEntities) {
S3VersionSummary s3VersionSummary = updatedEntity.getRawEntity();
AmazonS3Client s3Client = getClient(context);
GetObjectTaggingResult taggingResult = getTaggingResult(context, s3Client, s3VersionSummary);
ObjectMetadata objectMetadata = getObjectMetadata(context, s3Client, s3VersionSummary);
final AmazonS3Client s3Client = getClient(context);
final GetObjectTaggingResult taggingResult = getTaggingResult(context, s3Client, s3VersionSummary);
final ObjectMetadata objectMetadata = getObjectMetadata(context, s3Client, s3VersionSummary);
writer.addToListing(s3VersionSummary, taggingResult, objectMetadata, context.getProperty(S3_REGION).getValue());
listCount++;
@ -660,7 +657,6 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
writer.finishListingExceptionally(e);
session.rollback();
context.yield();
return;
}
}
}
@ -732,7 +728,7 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
}
public class S3ObjectBucketLister implements S3BucketLister {
private AmazonS3 client;
private final AmazonS3 client;
private ListObjectsRequest listObjectsRequest;
private ObjectListing objectListing;
@ -762,22 +758,11 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
@Override
public VersionListing listVersions() {
VersionListing versionListing = new VersionListing();
this.objectListing = client.listObjects(listObjectsRequest);
for(S3ObjectSummary objectSummary : objectListing.getObjectSummaries()) {
S3VersionSummary versionSummary = new S3VersionSummary();
versionSummary.setBucketName(objectSummary.getBucketName());
versionSummary.setETag(objectSummary.getETag());
versionSummary.setKey(objectSummary.getKey());
versionSummary.setLastModified(objectSummary.getLastModified());
versionSummary.setOwner(objectSummary.getOwner());
versionSummary.setSize(objectSummary.getSize());
versionSummary.setStorageClass(objectSummary.getStorageClass());
versionSummary.setIsLatest(true);
versionListing.getVersionSummaries().add(versionSummary);
}
objectListing = client.listObjects(listObjectsRequest);
final VersionListing versionListing = new VersionListing();
versionListing.setVersionSummaries(objectListing.getObjectSummaries().stream()
.map(ListS3.this::objectSummaryToVersionSummary)
.collect(Collectors.toList()));
return versionListing;
}
@ -788,12 +773,12 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
@Override
public boolean isTruncated() {
return (objectListing == null) ? false : objectListing.isTruncated();
return objectListing != null && objectListing.isTruncated();
}
}
public class S3ObjectBucketListerVersion2 implements S3BucketLister {
private AmazonS3 client;
private final AmazonS3 client;
private ListObjectsV2Request listObjectsRequest;
private ListObjectsV2Result objectListing;
@ -823,22 +808,11 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
@Override
public VersionListing listVersions() {
VersionListing versionListing = new VersionListing();
this.objectListing = client.listObjectsV2(listObjectsRequest);
for(S3ObjectSummary objectSummary : objectListing.getObjectSummaries()) {
S3VersionSummary versionSummary = new S3VersionSummary();
versionSummary.setBucketName(objectSummary.getBucketName());
versionSummary.setETag(objectSummary.getETag());
versionSummary.setKey(objectSummary.getKey());
versionSummary.setLastModified(objectSummary.getLastModified());
versionSummary.setOwner(objectSummary.getOwner());
versionSummary.setSize(objectSummary.getSize());
versionSummary.setStorageClass(objectSummary.getStorageClass());
versionSummary.setIsLatest(true);
versionListing.getVersionSummaries().add(versionSummary);
}
objectListing = client.listObjectsV2(listObjectsRequest);
final VersionListing versionListing = new VersionListing();
versionListing.setVersionSummaries(objectListing.getObjectSummaries().stream()
.map(ListS3.this::objectSummaryToVersionSummary)
.collect(Collectors.toList()));
return versionListing;
}
@ -849,12 +823,12 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
@Override
public boolean isTruncated() {
return (objectListing == null) ? false : objectListing.isTruncated();
return objectListing != null && objectListing.isTruncated();
}
}
public class S3VersionBucketLister implements S3BucketLister {
private AmazonS3 client;
private final AmazonS3 client;
private ListVersionsRequest listVersionsRequest;
private VersionListing versionListing;
@ -896,7 +870,7 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
@Override
public boolean isTruncated() {
return (versionListing == null) ? false : versionListing.isTruncated();
return versionListing != null && versionListing.isTruncated();
}
}
@ -944,13 +918,12 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
RECORD_SCHEMA = new SimpleRecordSchema(fields);
}
private final ProcessSession session;
private final RecordSetWriterFactory writerFactory;
private final ComponentLog logger;
private final String region;
private RecordSetWriter recordWriter;
private FlowFile flowFile;
private String region;
public RecordObjectWriter(final ProcessSession session, final RecordSetWriterFactory writerFactory, final ComponentLog logger, final String region) {
this.session = session;
@ -1042,8 +1015,6 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
}
}
static class AttributeObjectWriter implements S3ObjectWriter {
private final ProcessSession session;
@ -1062,14 +1033,17 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
attributes.put(CoreAttributes.FILENAME.key(), versionSummary.getKey());
attributes.put("s3.bucket", versionSummary.getBucketName());
attributes.put("s3.region", region);
if (versionSummary.getOwner() != null) { // We may not have permission to read the owner
attributes.put("s3.owner", versionSummary.getOwner().getId());
}
attributes.put("s3.etag", versionSummary.getETag());
attributes.put("s3.lastModified", String.valueOf(versionSummary.getLastModified().getTime()));
attributes.put("s3.length", String.valueOf(versionSummary.getSize()));
attributes.put("s3.storeClass", versionSummary.getStorageClass());
attributes.put("s3.isLatest", String.valueOf(versionSummary.isLatest()));
if (versionSummary.getVersionId() != null) {
attributes.put("s3.version", versionSummary.getVersionId());
}
@ -1131,8 +1105,6 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
final List<ConfigVerificationResult> results = new ArrayList<>(super.verify(context, logger, attributes));
final String bucketName = context.getProperty(BUCKET_WITHOUT_DEFAULT_VALUE).evaluateAttributeExpressions(attributes).getValue();
final long minAgeMilliseconds = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
final Long maxAgeMilliseconds = context.getProperty(MAX_AGE) != null ? context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS) : null;
if (bucketName == null || bucketName.trim().isEmpty()) {
results.add(new ConfigVerificationResult.Builder()
@ -1144,36 +1116,28 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
return results;
}
final S3BucketLister bucketLister = getS3BucketLister(context, client);
final long listingTimestamp = System.currentTimeMillis();
// Attempt to perform a listing of objects in the S3 bucket
try {
int listCount = 0;
int totalListCount = 0;
VersionListing versionListing;
final S3BucketLister bucketLister = getS3BucketLister(context, client);
int totalItems = 0;
int totalMatchingItems = 0;
do {
versionListing = bucketLister.listVersions();
final VersionListing versionListing = bucketLister.listVersions();
final long currentTime = System.currentTimeMillis();
for (final S3VersionSummary versionSummary : versionListing.getVersionSummaries()) {
long lastModified = versionSummary.getLastModified().getTime();
if ((maxAgeMilliseconds != null && (lastModified < (listingTimestamp - maxAgeMilliseconds)))
|| lastModified > (listingTimestamp - minAgeMilliseconds)) {
continue;
totalItems++;
if (includeObjectInListing(versionSummary, currentTime)) {
totalMatchingItems++;
}
listCount++;
}
bucketLister.setNextMarker();
totalListCount += listCount;
listCount = 0;
} while (bucketLister.isTruncated());
results.add(new ConfigVerificationResult.Builder()
.verificationStepName("Perform Listing")
.outcome(Outcome.SUCCESSFUL)
.explanation("Successfully listed contents of bucket '" + bucketName + "', finding " + totalListCount + " objects matching the filter")
.explanation("Successfully listed contents of bucket '" + bucketName + "', finding " + totalItems + " total object(s). "
+ totalMatchingItems + " objects matched the filter.")
.build());
logger.info("Successfully verified configuration");
@ -1189,4 +1153,27 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
return results;
}
/**
* Return whether to include the entity in the listing, based on the minimum and maximum object age (if configured).
*/
private boolean includeObjectInListing(final S3VersionSummary versionSummary, final long currentTimeMillis) {
final long lastModifiedTime = versionSummary.getLastModified().getTime();
return (minObjectAgeMilliseconds == null || currentTimeMillis >= lastModifiedTime + minObjectAgeMilliseconds)
&& (maxObjectAgeMilliseconds == null || currentTimeMillis <= lastModifiedTime + maxObjectAgeMilliseconds);
}
private S3VersionSummary objectSummaryToVersionSummary(final S3ObjectSummary objectSummary) {
final S3VersionSummary versionSummary = new S3VersionSummary();
versionSummary.setBucketName(objectSummary.getBucketName());
versionSummary.setETag(objectSummary.getETag());
versionSummary.setKey(objectSummary.getKey());
versionSummary.setLastModified(objectSummary.getLastModified());
versionSummary.setOwner(objectSummary.getOwner());
versionSummary.setSize(objectSummary.getSize());
versionSummary.setStorageClass(objectSummary.getStorageClass());
versionSummary.setIsLatest(true);
return versionSummary;
}
}

View File

@ -132,7 +132,7 @@ public class TestListS3 {
.verify(runner.getProcessContext(), runner.getLogger(), Collections.emptyMap());
assertEquals(ConfigVerificationResult.Outcome.SUCCESSFUL, results.get(0).getOutcome());
assertEquals(ConfigVerificationResult.Outcome.SUCCESSFUL, results.get(1).getOutcome());
assertTrue(results.get(1).getExplanation().contains("finding 3 objects"));
assertTrue(results.get(1).getExplanation().contains("finding 3 total object(s)"));
}
@Test