NIFI-4876 - Adding Min Object Age to ListS3

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #2491.
This commit is contained in:
James Wing 2018-02-25 10:41:33 -08:00 committed by Pierre Villard
parent 19b5b60b0f
commit 77dc186097
2 changed files with 69 additions and 3 deletions

View File

@ -124,10 +124,19 @@ public class ListS3 extends AbstractS3Processor {
.description("Specifies whether to use the original List Objects or the newer List Objects Version 2 endpoint.") .description("Specifies whether to use the original List Objects or the newer List Objects Version 2 endpoint.")
.build(); .build();
public static final PropertyDescriptor MIN_AGE = new PropertyDescriptor.Builder()
.name("min-age")
.displayName("Minimum Object Age")
.description("The minimum age that an S3 object must be in order to be considered; any object younger than this amount of time (according to last modification date) will be ignored")
.required(true)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.defaultValue("0 sec")
.build();
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList( public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
Arrays.asList(BUCKET, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, Arrays.asList(BUCKET, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE,
AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE,
SIGNER_OVERRIDE, PROXY_HOST, PROXY_HOST_PORT, DELIMITER, PREFIX, USE_VERSIONS, LIST_TYPE)); SIGNER_OVERRIDE, PROXY_HOST, PROXY_HOST_PORT, DELIMITER, PREFIX, USE_VERSIONS, LIST_TYPE, MIN_AGE));
public static final Set<Relationship> relationships = Collections.unmodifiableSet( public static final Set<Relationship> relationships = Collections.unmodifiableSet(
new HashSet<>(Collections.singletonList(REL_SUCCESS))); new HashSet<>(Collections.singletonList(REL_SUCCESS)));
@ -197,6 +206,8 @@ public class ListS3 extends AbstractS3Processor {
final long startNanos = System.nanoTime(); final long startNanos = System.nanoTime();
final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions().getValue(); final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions().getValue();
final long minAgeMilliseconds = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
final long listingTimestamp = System.currentTimeMillis();
final AmazonS3 client = getClient(); final AmazonS3 client = getClient();
int listCount = 0; int listCount = 0;
@ -227,7 +238,8 @@ public class ListS3 extends AbstractS3Processor {
for (S3VersionSummary versionSummary : versionListing.getVersionSummaries()) { for (S3VersionSummary versionSummary : versionListing.getVersionSummaries()) {
long lastModified = versionSummary.getLastModified().getTime(); long lastModified = versionSummary.getLastModified().getTime();
if (lastModified < currentTimestamp if (lastModified < currentTimestamp
|| lastModified == currentTimestamp && currentKeys.contains(versionSummary.getKey())) { || lastModified == currentTimestamp && currentKeys.contains(versionSummary.getKey())
|| lastModified > (listingTimestamp - minAgeMilliseconds)) {
continue; continue;
} }

View File

@ -23,6 +23,7 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.commons.lang3.time.DateUtils;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.Scope; import org.apache.nifi.components.state.Scope;
import org.apache.nifi.state.MockStateManager; import org.apache.nifi.state.MockStateManager;
@ -237,11 +238,63 @@ public class TestListS3 {
runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 0); runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 0);
} }
@Test
public void testListIgnoreByMinAge() throws IOException {
runner.setProperty(ListS3.REGION, "eu-west-1");
runner.setProperty(ListS3.BUCKET, "test-bucket");
runner.setProperty(ListS3.MIN_AGE, "30 sec");
Date lastModifiedNow = new Date();
Date lastModifiedMinus1Hour = DateUtils.addHours(lastModifiedNow, -1);
Date lastModifiedMinus3Hour = DateUtils.addHours(lastModifiedNow, -3);
ObjectListing objectListing = new ObjectListing();
S3ObjectSummary objectSummary1 = new S3ObjectSummary();
objectSummary1.setBucketName("test-bucket");
objectSummary1.setKey("minus-3hour");
objectSummary1.setLastModified(lastModifiedMinus3Hour);
objectListing.getObjectSummaries().add(objectSummary1);
S3ObjectSummary objectSummary2 = new S3ObjectSummary();
objectSummary2.setBucketName("test-bucket");
objectSummary2.setKey("minus-1hour");
objectSummary2.setLastModified(lastModifiedMinus1Hour);
objectListing.getObjectSummaries().add(objectSummary2);
S3ObjectSummary objectSummary3 = new S3ObjectSummary();
objectSummary3.setBucketName("test-bucket");
objectSummary3.setKey("now");
objectSummary3.setLastModified(lastModifiedNow);
objectListing.getObjectSummaries().add(objectSummary3);
Mockito.when(mockS3Client.listObjects(Mockito.any(ListObjectsRequest.class))).thenReturn(objectListing);
Map<String,String> stateMap = new HashMap<>();
String previousTimestamp = String.valueOf(lastModifiedMinus3Hour.getTime());
stateMap.put(ListS3.CURRENT_TIMESTAMP, previousTimestamp);
stateMap.put(ListS3.CURRENT_KEY_PREFIX + "0", "minus-3hour");
runner.getStateManager().setState(stateMap, Scope.CLUSTER);
runner.run();
ArgumentCaptor<ListObjectsRequest> captureRequest = ArgumentCaptor.forClass(ListObjectsRequest.class);
Mockito.verify(mockS3Client, Mockito.times(1)).listObjects(captureRequest.capture());
ListObjectsRequest request = captureRequest.getValue();
assertEquals("test-bucket", request.getBucketName());
Mockito.verify(mockS3Client, Mockito.never()).listVersions(Mockito.any());
runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ListS3.REL_SUCCESS);
MockFlowFile ff0 = flowFiles.get(0);
ff0.assertAttributeEquals("filename", "minus-1hour");
ff0.assertAttributeEquals("s3.bucket", "test-bucket");
String lastModifiedTimestamp = String.valueOf(lastModifiedMinus1Hour.getTime());
ff0.assertAttributeEquals("s3.lastModified", lastModifiedTimestamp);
runner.getStateManager().assertStateEquals(ListS3.CURRENT_TIMESTAMP, lastModifiedTimestamp, Scope.CLUSTER);
}
@Test @Test
public void testGetPropertyDescriptors() throws Exception { public void testGetPropertyDescriptors() throws Exception {
ListS3 processor = new ListS3(); ListS3 processor = new ListS3();
List<PropertyDescriptor> pd = processor.getSupportedPropertyDescriptors(); List<PropertyDescriptor> pd = processor.getSupportedPropertyDescriptors();
assertEquals("size should be eq", 16, pd.size()); assertEquals("size should be eq", 17, pd.size());
assertTrue(pd.contains(ListS3.ACCESS_KEY)); assertTrue(pd.contains(ListS3.ACCESS_KEY));
assertTrue(pd.contains(ListS3.AWS_CREDENTIALS_PROVIDER_SERVICE)); assertTrue(pd.contains(ListS3.AWS_CREDENTIALS_PROVIDER_SERVICE));
assertTrue(pd.contains(ListS3.BUCKET)); assertTrue(pd.contains(ListS3.BUCKET));
@ -257,5 +310,6 @@ public class TestListS3 {
assertTrue(pd.contains(ListS3.DELIMITER)); assertTrue(pd.contains(ListS3.DELIMITER));
assertTrue(pd.contains(ListS3.PREFIX)); assertTrue(pd.contains(ListS3.PREFIX));
assertTrue(pd.contains(ListS3.USE_VERSIONS)); assertTrue(pd.contains(ListS3.USE_VERSIONS));
assertTrue(pd.contains(ListS3.MIN_AGE));
} }
} }