NIFI-12594: ListS3 - observe min/max object age when entity state tracking is used

This closes #8231.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>

(cherry picked from commit 3ebad40fae458db3fe664ddd6738b770a26289c8)
This commit is contained in:
p-kimberley 2024-01-10 14:59:59 +01:00 committed by Peter Turcsanyi
parent 037ab2886f
commit b4487a0bf0
No known key found for this signature in database
GPG Key ID: 55A813F1C3E553DC
2 changed files with 85 additions and 98 deletions

View File

@ -162,11 +162,12 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
public static final PropertyDescriptor INITIAL_LISTING_TARGET = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(ListedEntityTracker.INITIAL_LISTING_TARGET)
.dependsOn(LISTING_STRATEGY, BY_ENTITIES)
.required(true)
.build();
public static final PropertyDescriptor TRACKING_TIME_WINDOW = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(ListedEntityTracker.TRACKING_TIME_WINDOW)
.dependsOn(INITIAL_LISTING_TARGET, ListedEntityTracker.INITIAL_LISTING_TARGET_WINDOW)
.dependsOn(ListedEntityTracker.TRACKING_STATE_CACHE)
.required(true)
.build();
@ -287,8 +288,8 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(Arrays.asList(
LISTING_STRATEGY,
TRACKING_STATE_CACHE,
INITIAL_LISTING_TARGET,
TRACKING_TIME_WINDOW,
INITIAL_LISTING_TARGET,
BUCKET,
REGION,
ACCESS_KEY,
@ -329,6 +330,8 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
private volatile boolean justElectedPrimaryNode = false;
private volatile boolean resetEntityTrackingState = false;
private volatile ListedEntityTracker<ListableEntityWrapper<S3VersionSummary>> listedEntityTracker;
private volatile Long minObjectAgeMilliseconds;
private volatile Long maxObjectAgeMilliseconds;
@OnPrimaryNodeStateChange
public void onPrimaryNodeChange(final PrimaryNodeState newState) {
@ -354,6 +357,9 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
} else {
listedEntityTracker = null;
}
minObjectAgeMilliseconds = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
maxObjectAgeMilliseconds = context.getProperty(MAX_AGE) != null ? context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS) : null;
}
protected ListedEntityTracker<ListableEntityWrapper<S3VersionSummary>> createListedEntityTracker() {
@ -364,7 +370,7 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
return new Validator() {
@Override
public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
boolean requesterPays = Boolean.valueOf(input);
boolean requesterPays = Boolean.parseBoolean(input);
boolean useVersions = context.getProperty(USE_VERSIONS).asBoolean();
boolean valid = !requesterPays || !useVersions;
return new ValidationResult.Builder()
@ -474,26 +480,21 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
}
final AmazonS3 client = getClient(context);
S3BucketLister bucketLister = getS3BucketLister(context, client);
final long startNanos = System.nanoTime();
final long minAgeMilliseconds = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
final Long maxAgeMilliseconds = context.getProperty(MAX_AGE) != null ? context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS) : null;
final long listingTimestamp = System.currentTimeMillis();
final S3BucketLister bucketLister = getS3BucketLister(context, client);
final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions().getValue();
final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
final ListingSnapshot currentListing = listing.get();
final long currentTimestamp = currentListing.getTimestamp();
final long startNanos = System.nanoTime();
final long currentTimestamp = System.currentTimeMillis();
final long listingTimestamp = currentListing.getTimestamp();
final Set<String> currentKeys = currentListing.getKeys();
int listCount = 0;
int totalListCount = 0;
long latestListedTimestampInThisCycle = currentTimestamp;
long latestListedTimestampInThisCycle = listingTimestamp;
final Set<String> listedKeys = new HashSet<>();
getLogger().trace("Start listing, listingTimestamp={}, currentTimestamp={}, currentKeys={}", new Object[]{listingTimestamp, currentTimestamp, currentKeys});
getLogger().trace("Start listing, listingTimestamp={}, currentTimestamp={}, currentKeys={}", new Object[]{currentTimestamp, listingTimestamp, currentKeys});
final S3ObjectWriter writer;
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
@ -507,23 +508,20 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
writer.beginListing();
do {
VersionListing versionListing = bucketLister.listVersions();
final VersionListing versionListing = bucketLister.listVersions();
for (S3VersionSummary versionSummary : versionListing.getVersionSummaries()) {
long lastModified = versionSummary.getLastModified().getTime();
if (lastModified < currentTimestamp
|| lastModified == currentTimestamp && currentKeys.contains(versionSummary.getKey())
|| (maxAgeMilliseconds != null && (lastModified < (listingTimestamp - maxAgeMilliseconds)))
|| lastModified > (listingTimestamp - minAgeMilliseconds)) {
final long lastModified = versionSummary.getLastModified().getTime();
if (lastModified < listingTimestamp
|| lastModified == listingTimestamp && currentKeys.contains(versionSummary.getKey())
|| !includeObjectInListing(versionSummary, currentTimestamp)) {
continue;
}
getLogger().trace("Listed key={}, lastModified={}, currentKeys={}", new Object[]{versionSummary.getKey(), lastModified, currentKeys});
GetObjectTaggingResult taggingResult = getTaggingResult(context, client, versionSummary);
ObjectMetadata objectMetadata = getObjectMetadata(context, client, versionSummary);
// Write the entity to the listing
final GetObjectTaggingResult taggingResult = getTaggingResult(context, client, versionSummary);
final ObjectMetadata objectMetadata = getObjectMetadata(context, client, versionSummary);
writer.addToListing(versionSummary, taggingResult, objectMetadata, context.getProperty(S3_REGION).getValue());
// Track the latest lastModified timestamp and keys having that timestamp.
@ -539,8 +537,8 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
listCount++;
}
bucketLister.setNextMarker();
bucketLister.setNextMarker();
totalListCount += listCount;
if (listCount >= batchSize && writer.isCheckpoint()) {
@ -561,7 +559,7 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
}
final Set<String> updatedKeys = new HashSet<>();
if (latestListedTimestampInThisCycle <= currentTimestamp) {
if (latestListedTimestampInThisCycle <= listingTimestamp) {
updatedKeys.addAll(currentKeys);
}
updatedKeys.addAll(listedKeys);
@ -585,10 +583,12 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
private void listByTrackingEntities(ProcessContext context, ProcessSession session) {
listedEntityTracker.trackEntities(context, session, justElectedPrimaryNode, Scope.CLUSTER, minTimestampToList -> {
S3BucketLister bucketLister = getS3BucketLister(context, getClient(context));
final long currentTime = System.currentTimeMillis();
List<ListableEntityWrapper<S3VersionSummary>> listedEntities = bucketLister.listVersions().getVersionSummaries()
return bucketLister.listVersions().getVersionSummaries()
.stream()
.filter(s3VersionSummary -> s3VersionSummary.getLastModified().getTime() >= minTimestampToList)
.filter(s3VersionSummary -> s3VersionSummary.getLastModified().getTime() >= minTimestampToList
&& includeObjectInListing(s3VersionSummary, currentTime))
.map(s3VersionSummary -> new ListableEntityWrapper<S3VersionSummary>(
s3VersionSummary,
S3VersionSummary::getKey,
@ -597,8 +597,6 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
S3VersionSummary::getSize
))
.collect(Collectors.toList());
return listedEntities;
}, null);
justElectedPrimaryNode = false;
@ -645,10 +643,9 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
for (ListableEntityWrapper<S3VersionSummary> updatedEntity : updatedEntities) {
S3VersionSummary s3VersionSummary = updatedEntity.getRawEntity();
AmazonS3Client s3Client = getClient(context);
GetObjectTaggingResult taggingResult = getTaggingResult(context, s3Client, s3VersionSummary);
ObjectMetadata objectMetadata = getObjectMetadata(context, s3Client, s3VersionSummary);
final AmazonS3Client s3Client = getClient(context);
final GetObjectTaggingResult taggingResult = getTaggingResult(context, s3Client, s3VersionSummary);
final ObjectMetadata objectMetadata = getObjectMetadata(context, s3Client, s3VersionSummary);
writer.addToListing(s3VersionSummary, taggingResult, objectMetadata, context.getProperty(S3_REGION).getValue());
listCount++;
@ -668,7 +665,6 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
writer.finishListingExceptionally(e);
session.rollback();
context.yield();
return;
}
}
}
@ -740,7 +736,7 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
}
public class S3ObjectBucketLister implements S3BucketLister {
private AmazonS3 client;
private final AmazonS3 client;
private ListObjectsRequest listObjectsRequest;
private ObjectListing objectListing;
@ -770,22 +766,11 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
@Override
public VersionListing listVersions() {
VersionListing versionListing = new VersionListing();
this.objectListing = client.listObjects(listObjectsRequest);
for(S3ObjectSummary objectSummary : objectListing.getObjectSummaries()) {
S3VersionSummary versionSummary = new S3VersionSummary();
versionSummary.setBucketName(objectSummary.getBucketName());
versionSummary.setETag(objectSummary.getETag());
versionSummary.setKey(objectSummary.getKey());
versionSummary.setLastModified(objectSummary.getLastModified());
versionSummary.setOwner(objectSummary.getOwner());
versionSummary.setSize(objectSummary.getSize());
versionSummary.setStorageClass(objectSummary.getStorageClass());
versionSummary.setIsLatest(true);
versionListing.getVersionSummaries().add(versionSummary);
}
objectListing = client.listObjects(listObjectsRequest);
final VersionListing versionListing = new VersionListing();
versionListing.setVersionSummaries(objectListing.getObjectSummaries().stream()
.map(ListS3.this::objectSummaryToVersionSummary)
.collect(Collectors.toList()));
return versionListing;
}
@ -796,12 +781,12 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
@Override
public boolean isTruncated() {
return (objectListing == null) ? false : objectListing.isTruncated();
return objectListing != null && objectListing.isTruncated();
}
}
public class S3ObjectBucketListerVersion2 implements S3BucketLister {
private AmazonS3 client;
private final AmazonS3 client;
private ListObjectsV2Request listObjectsRequest;
private ListObjectsV2Result objectListing;
@ -831,22 +816,11 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
@Override
public VersionListing listVersions() {
VersionListing versionListing = new VersionListing();
this.objectListing = client.listObjectsV2(listObjectsRequest);
for(S3ObjectSummary objectSummary : objectListing.getObjectSummaries()) {
S3VersionSummary versionSummary = new S3VersionSummary();
versionSummary.setBucketName(objectSummary.getBucketName());
versionSummary.setETag(objectSummary.getETag());
versionSummary.setKey(objectSummary.getKey());
versionSummary.setLastModified(objectSummary.getLastModified());
versionSummary.setOwner(objectSummary.getOwner());
versionSummary.setSize(objectSummary.getSize());
versionSummary.setStorageClass(objectSummary.getStorageClass());
versionSummary.setIsLatest(true);
versionListing.getVersionSummaries().add(versionSummary);
}
objectListing = client.listObjectsV2(listObjectsRequest);
final VersionListing versionListing = new VersionListing();
versionListing.setVersionSummaries(objectListing.getObjectSummaries().stream()
.map(ListS3.this::objectSummaryToVersionSummary)
.collect(Collectors.toList()));
return versionListing;
}
@ -857,12 +831,12 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
@Override
public boolean isTruncated() {
return (objectListing == null) ? false : objectListing.isTruncated();
return objectListing != null && objectListing.isTruncated();
}
}
public class S3VersionBucketLister implements S3BucketLister {
private AmazonS3 client;
private final AmazonS3 client;
private ListVersionsRequest listVersionsRequest;
private VersionListing versionListing;
@ -904,7 +878,7 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
@Override
public boolean isTruncated() {
return (versionListing == null) ? false : versionListing.isTruncated();
return versionListing != null && versionListing.isTruncated();
}
}
@ -952,13 +926,12 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
RECORD_SCHEMA = new SimpleRecordSchema(fields);
}
private final ProcessSession session;
private final RecordSetWriterFactory writerFactory;
private final ComponentLog logger;
private final String region;
private RecordSetWriter recordWriter;
private FlowFile flowFile;
private String region;
public RecordObjectWriter(final ProcessSession session, final RecordSetWriterFactory writerFactory, final ComponentLog logger, final String region) {
this.session = session;
@ -1050,8 +1023,6 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
}
}
static class AttributeObjectWriter implements S3ObjectWriter {
private final ProcessSession session;
@ -1070,14 +1041,17 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
attributes.put(CoreAttributes.FILENAME.key(), versionSummary.getKey());
attributes.put("s3.bucket", versionSummary.getBucketName());
attributes.put("s3.region", region);
if (versionSummary.getOwner() != null) { // We may not have permission to read the owner
attributes.put("s3.owner", versionSummary.getOwner().getId());
}
attributes.put("s3.etag", versionSummary.getETag());
attributes.put("s3.lastModified", String.valueOf(versionSummary.getLastModified().getTime()));
attributes.put("s3.length", String.valueOf(versionSummary.getSize()));
attributes.put("s3.storeClass", versionSummary.getStorageClass());
attributes.put("s3.isLatest", String.valueOf(versionSummary.isLatest()));
if (versionSummary.getVersionId() != null) {
attributes.put("s3.version", versionSummary.getVersionId());
}
@ -1139,8 +1113,6 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
final List<ConfigVerificationResult> results = new ArrayList<>(super.verify(context, logger, attributes));
final String bucketName = context.getProperty(BUCKET).evaluateAttributeExpressions(attributes).getValue();
final long minAgeMilliseconds = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
final Long maxAgeMilliseconds = context.getProperty(MAX_AGE) != null ? context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS) : null;
if (bucketName == null || bucketName.trim().isEmpty()) {
results.add(new ConfigVerificationResult.Builder()
@ -1152,36 +1124,28 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
return results;
}
final S3BucketLister bucketLister = getS3BucketLister(context, client);
final long listingTimestamp = System.currentTimeMillis();
// Attempt to perform a listing of objects in the S3 bucket
try {
int listCount = 0;
int totalListCount = 0;
VersionListing versionListing;
final S3BucketLister bucketLister = getS3BucketLister(context, client);
int totalItems = 0;
int totalMatchingItems = 0;
do {
versionListing = bucketLister.listVersions();
final VersionListing versionListing = bucketLister.listVersions();
final long currentTime = System.currentTimeMillis();
for (final S3VersionSummary versionSummary : versionListing.getVersionSummaries()) {
long lastModified = versionSummary.getLastModified().getTime();
if ((maxAgeMilliseconds != null && (lastModified < (listingTimestamp - maxAgeMilliseconds)))
|| lastModified > (listingTimestamp - minAgeMilliseconds)) {
continue;
totalItems++;
if (includeObjectInListing(versionSummary, currentTime)) {
totalMatchingItems++;
}
listCount++;
}
bucketLister.setNextMarker();
totalListCount += listCount;
listCount = 0;
} while (bucketLister.isTruncated());
results.add(new ConfigVerificationResult.Builder()
.verificationStepName("Perform Listing")
.outcome(Outcome.SUCCESSFUL)
.explanation("Successfully listed contents of bucket '" + bucketName + "', finding " + totalListCount + " objects matching the filter")
.explanation("Successfully listed contents of bucket '" + bucketName + "', finding " + totalItems + " total object(s). "
+ totalMatchingItems + " objects matched the filter.")
.build());
logger.info("Successfully verified configuration");
@ -1197,4 +1161,27 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
return results;
}
/**
* Return whether to include the entity in the listing, based on the minimum and maximum object age (if configured).
*/
private boolean includeObjectInListing(final S3VersionSummary versionSummary, final long currentTimeMillis) {
final long lastModifiedTime = versionSummary.getLastModified().getTime();
return (minObjectAgeMilliseconds == null || currentTimeMillis >= lastModifiedTime + minObjectAgeMilliseconds)
&& (maxObjectAgeMilliseconds == null || currentTimeMillis <= lastModifiedTime + maxObjectAgeMilliseconds);
}
private S3VersionSummary objectSummaryToVersionSummary(final S3ObjectSummary objectSummary) {
final S3VersionSummary versionSummary = new S3VersionSummary();
versionSummary.setBucketName(objectSummary.getBucketName());
versionSummary.setETag(objectSummary.getETag());
versionSummary.setKey(objectSummary.getKey());
versionSummary.setLastModified(objectSummary.getLastModified());
versionSummary.setOwner(objectSummary.getOwner());
versionSummary.setSize(objectSummary.getSize());
versionSummary.setStorageClass(objectSummary.getStorageClass());
versionSummary.setIsLatest(true);
return versionSummary;
}
}

View File

@ -127,7 +127,7 @@ public class TestListS3 {
.verify(runner.getProcessContext(), runner.getLogger(), Collections.emptyMap());
assertEquals(ConfigVerificationResult.Outcome.SUCCESSFUL, results.get(0).getOutcome());
assertEquals(ConfigVerificationResult.Outcome.SUCCESSFUL, results.get(1).getOutcome());
assertTrue(results.get(1).getExplanation().contains("finding 3 objects"));
assertTrue(results.get(1).getExplanation().contains("finding 3 total object(s)"));
}
@Test