NIFI-10349 add maximum object age property to list s3

This closes #6293.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
Marco Carlino 2022-08-11 17:31:21 +02:00 committed by Peter Turcsanyi
parent 2acfd1e496
commit 6ce2e3799c
No known key found for this signature in database
GPG Key ID: 55A813F1C3E553DC
2 changed files with 80 additions and 1 deletions

View File

@ -81,6 +81,7 @@ import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.util.FormatUtils;
import java.io.IOException;
import java.io.OutputStream;
@ -220,6 +221,15 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
.defaultValue("0 sec")
.build();
public static final PropertyDescriptor MAX_AGE = new Builder()
.name("max-age")
.displayName("Maximum Object Age")
.description("The maximum age that an S3 object can be in order to be considered; any object older than this amount of time (according to last modification date) will be ignored")
.required(false)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.addValidator(createMaxAgeValidator())
.build();
public static final PropertyDescriptor WRITE_OBJECT_TAGS = new Builder()
.name("write-s3-object-tags")
.displayName("Write Object Tags")
@ -284,6 +294,7 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
SECRET_KEY,
RECORD_WRITER,
MIN_AGE,
MAX_AGE,
BATCH_SIZE,
WRITE_OBJECT_TAGS,
WRITE_USER_METADATA,
@ -362,6 +373,22 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
}
};
}
private static Validator createMaxAgeValidator() {
return new Validator() {
@Override
public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
Double maxAge = input != null ? FormatUtils.getPreciseTimeDuration(input, TimeUnit.MILLISECONDS) : null;
long minAge = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
boolean valid = input != null && maxAge > minAge;
return new ValidationResult.Builder()
.input(input)
.subject(subject)
.valid(valid)
.explanation(valid ? null : "'Maximum Age' must be greater than 'Minimum Age' ")
.build();
}
};
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@ -449,6 +476,7 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
final long startNanos = System.nanoTime();
final long minAgeMilliseconds = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
final Long maxAgeMilliseconds = context.getProperty(MAX_AGE) != null ? context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS) : null;
final long listingTimestamp = System.currentTimeMillis();
final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions().getValue();
@ -481,6 +509,7 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
long lastModified = versionSummary.getLastModified().getTime();
if (lastModified < currentTimestamp
|| lastModified == currentTimestamp && currentKeys.contains(versionSummary.getKey())
|| (maxAgeMilliseconds != null && (lastModified < (listingTimestamp - maxAgeMilliseconds)))
|| lastModified > (listingTimestamp - minAgeMilliseconds)) {
continue;
}
@ -1103,6 +1132,7 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
final List<ConfigVerificationResult> results = new ArrayList<>(super.verify(context, logger, attributes));
final String bucketName = context.getProperty(BUCKET).evaluateAttributeExpressions(attributes).getValue();
final long minAgeMilliseconds = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
final Long maxAgeMilliseconds = context.getProperty(MAX_AGE) != null ? context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS) : null;
if (bucketName == null || bucketName.trim().isEmpty()) {
results.add(new ConfigVerificationResult.Builder()
@ -1126,7 +1156,8 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
versionListing = bucketLister.listVersions();
for (final S3VersionSummary versionSummary : versionListing.getVersionSummaries()) {
long lastModified = versionSummary.getLastModified().getTime();
if (lastModified > (listingTimestamp - minAgeMilliseconds)) {
if ((maxAgeMilliseconds != null && (lastModified < (listingTimestamp - maxAgeMilliseconds)))
|| lastModified > (listingTimestamp - minAgeMilliseconds)) {
continue;
}

View File

@ -463,6 +463,54 @@ public class TestListS3 {
runner.getStateManager().assertStateEquals(ListS3.CURRENT_TIMESTAMP, lastModifiedTimestamp, Scope.CLUSTER);
}
@Test
public void testListIgnoreByMaxAge() throws IOException {
runner.setProperty(ListS3.REGION, "eu-west-1");
runner.setProperty(ListS3.BUCKET, "test-bucket");
runner.setProperty(ListS3.MAX_AGE, "30 sec");
Date lastModifiedNow = new Date();
Date lastModifiedMinus1Hour = DateUtils.addHours(lastModifiedNow, -1);
Date lastModifiedMinus3Hour = DateUtils.addHours(lastModifiedNow, -3);
ObjectListing objectListing = new ObjectListing();
S3ObjectSummary objectSummary1 = new S3ObjectSummary();
objectSummary1.setBucketName("test-bucket");
objectSummary1.setKey("minus-3hour");
objectSummary1.setLastModified(lastModifiedMinus3Hour);
objectListing.getObjectSummaries().add(objectSummary1);
S3ObjectSummary objectSummary2 = new S3ObjectSummary();
objectSummary2.setBucketName("test-bucket");
objectSummary2.setKey("minus-1hour");
objectSummary2.setLastModified(lastModifiedMinus1Hour);
objectListing.getObjectSummaries().add(objectSummary2);
S3ObjectSummary objectSummary3 = new S3ObjectSummary();
objectSummary3.setBucketName("test-bucket");
objectSummary3.setKey("now");
objectSummary3.setLastModified(lastModifiedNow);
objectListing.getObjectSummaries().add(objectSummary3);
Mockito.when(mockS3Client.listObjects(Mockito.any(ListObjectsRequest.class))).thenReturn(objectListing);
Map<String,String> stateMap = new HashMap<>();
String previousTimestamp = String.valueOf(lastModifiedMinus3Hour.getTime());
stateMap.put(ListS3.CURRENT_TIMESTAMP, previousTimestamp);
stateMap.put(ListS3.CURRENT_KEY_PREFIX + "0", "minus-3hour");
runner.getStateManager().setState(stateMap, Scope.CLUSTER);
runner.run();
ArgumentCaptor<ListObjectsRequest> captureRequest = ArgumentCaptor.forClass(ListObjectsRequest.class);
Mockito.verify(mockS3Client, Mockito.times(1)).listObjects(captureRequest.capture());
ListObjectsRequest request = captureRequest.getValue();
assertEquals("test-bucket", request.getBucketName());
Mockito.verify(mockS3Client, Mockito.never()).listVersions(Mockito.any());
runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ListS3.REL_SUCCESS);
MockFlowFile ff0 = flowFiles.get(0);
ff0.assertAttributeEquals("filename", "now");
ff0.assertAttributeEquals("s3.bucket", "test-bucket");
String lastModifiedTimestamp = String.valueOf(lastModifiedNow.getTime());
ff0.assertAttributeEquals("s3.lastModified", lastModifiedTimestamp);
runner.getStateManager().assertStateEquals(ListS3.CURRENT_TIMESTAMP, lastModifiedTimestamp, Scope.CLUSTER);
}
@Test
public void testWriteObjectTags() {
runner.setProperty(ListS3.REGION, "eu-west-1");