NIFI-9317: Updating config verification for ListS3 (#5485)

This commit is contained in:
Joe Gresock 2021-10-27 13:47:58 -04:00 committed by GitHub
parent 0eee70c4b9
commit ca530f40d8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 92 additions and 37 deletions

View File

@ -303,17 +303,24 @@ public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceCl
public abstract void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException;
protected void initializeRegionAndEndpoint(final ProcessContext context, final AmazonWebServiceClient client) {
this.region = getRegionAndInitializeEndpoint(context, client);
}
protected Region getRegionAndInitializeEndpoint(final ProcessContext context, final AmazonWebServiceClient client) {
final Region region;
// if the processor supports REGION, get the configured region.
if (getSupportedPropertyDescriptors().contains(REGION)) {
final String region = context.getProperty(REGION).getValue();
if (region != null) {
this.region = Region.getRegion(Regions.fromName(region));
final String regionValue = context.getProperty(REGION).getValue();
if (regionValue != null) {
region = Region.getRegion(Regions.fromName(regionValue));
if (client != null) {
client.setRegion(this.region);
client.setRegion(region);
}
} else {
this.region = null;
region = null;
}
} else {
region = null;
}
// if the endpoint override has been configured, set the endpoint.
@ -328,8 +335,8 @@ public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceCl
// handling vpce endpoints
// falling back to the configured region if the parse fails
// e.g. in case of https://vpce-***-***.sqs.{region}.vpce.amazonaws.com
String region = parseRegionForVPCE(urlstr, this.region.getName());
client.setEndpoint(urlstr, this.client.getServiceName(), region);
String regionValue = parseRegionForVPCE(urlstr, region.getName());
client.setEndpoint(urlstr, this.client.getServiceName(), regionValue);
} else {
// handling non-vpce custom endpoints where the AWS library can parse the region out
// e.g. https://sqs.{region}.***.***.***.gov
@ -337,6 +344,7 @@ public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceCl
}
}
}
return region;
}
/*

View File

@ -53,6 +53,7 @@ import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
@ -342,41 +343,23 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
return;
}
final AmazonS3 client = getClient();
S3BucketLister bucketLister = getS3BucketLister(context, client);
final long startNanos = System.nanoTime();
final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions().getValue();
final long minAgeMilliseconds = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
final long listingTimestamp = System.currentTimeMillis();
final boolean requesterPays = context.getProperty(REQUESTER_PAYS).asBoolean();
final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions().getValue();
final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
final ListingSnapshot currentListing = listing.get();
final long currentTimestamp = currentListing.getTimestamp();
final Set<String> currentKeys = currentListing.getKeys();
final AmazonS3 client = getClient();
int listCount = 0;
int totalListCount = 0;
long latestListedTimestampInThisCycle = currentTimestamp;
String delimiter = context.getProperty(DELIMITER).getValue();
String prefix = context.getProperty(PREFIX).evaluateAttributeExpressions().getValue();
boolean useVersions = context.getProperty(USE_VERSIONS).asBoolean();
int listType = context.getProperty(LIST_TYPE).asInteger();
S3BucketLister bucketLister = useVersions
? new S3VersionBucketLister(client)
: listType == 2
? new S3ObjectBucketListerVersion2(client)
: new S3ObjectBucketLister(client);
bucketLister.setBucketName(bucket);
bucketLister.setRequesterPays(requesterPays);
if (delimiter != null && !delimiter.isEmpty()) {
bucketLister.setDelimiter(delimiter);
}
if (prefix != null && !prefix.isEmpty()) {
bucketLister.setPrefix(prefix);
}
VersionListing versionListing;
final Set<String> listedKeys = new HashSet<>();
@ -486,6 +469,33 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
}
}
private S3BucketLister getS3BucketLister(final ProcessContext context, final AmazonS3 client) {
final boolean requesterPays = context.getProperty(REQUESTER_PAYS).asBoolean();
final boolean useVersions = context.getProperty(USE_VERSIONS).asBoolean();
final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions().getValue();
final String delimiter = context.getProperty(DELIMITER).getValue();
final String prefix = context.getProperty(PREFIX).evaluateAttributeExpressions().getValue();
final int listType = context.getProperty(LIST_TYPE).asInteger();
final S3BucketLister bucketLister = useVersions
? new S3VersionBucketLister(client)
: listType == 2
? new S3ObjectBucketListerVersion2(client)
: new S3ObjectBucketLister(client);
bucketLister.setBucketName(bucket);
bucketLister.setRequesterPays(requesterPays);
if (delimiter != null && !delimiter.isEmpty()) {
bucketLister.setDelimiter(delimiter);
}
if (prefix != null && !prefix.isEmpty()) {
bucketLister.setPrefix(prefix);
}
return bucketLister;
}
private interface S3BucketLister {
void setBucketName(String bucketName);
@ -891,11 +901,15 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
@Override
public List<ConfigVerificationResult> verify(final ProcessContext context, final ComponentLog logger, final Map<String, String> attributes) {
final AmazonS3Client client = createClient(context, getCredentials(context), createConfiguration(context));
initializeRegionAndEndpoint(context, client);
final ControllerService service = context.getProperty(AWS_CREDENTIALS_PROVIDER_SERVICE).asControllerService();
final AmazonS3Client client = service != null ? createClient(context, getCredentialsProvider(context), createConfiguration(context))
: createClient(context, getCredentials(context), createConfiguration(context));
getRegionAndInitializeEndpoint(context, client);
final List<ConfigVerificationResult> results = new ArrayList<>();
final String bucketName = context.getProperty(BUCKET).evaluateAttributeExpressions().getValue();
final long minAgeMilliseconds = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
if (bucketName == null || bucketName.trim().isEmpty()) {
results.add(new ConfigVerificationResult.Builder()
@ -907,17 +921,35 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
return results;
}
final String prefix = context.getProperty(PREFIX).getValue();
final S3BucketLister bucketLister = getS3BucketLister(context, client);
final long listingTimestamp = System.currentTimeMillis();
// Attempt to perform a listing of objects in the S3 bucket
try {
final ObjectListing listing = client.listObjects(bucketName, prefix);
final int count = listing.getObjectSummaries().size();
int listCount = 0;
int totalListCount = 0;
VersionListing versionListing;
do {
versionListing = bucketLister.listVersions();
for (final S3VersionSummary versionSummary : versionListing.getVersionSummaries()) {
long lastModified = versionSummary.getLastModified().getTime();
if (lastModified > (listingTimestamp - minAgeMilliseconds)) {
continue;
}
listCount++;
}
bucketLister.setNextMarker();
totalListCount += listCount;
listCount = 0;
} while (bucketLister.isTruncated());
results.add(new ConfigVerificationResult.Builder()
.verificationStepName("Perform Listing")
.outcome(Outcome.SUCCESSFUL)
.explanation("Successfully listed contents of bucket '" + bucketName + "', finding " + count + " objects" + (prefix == null ? "" : " with a prefix of '" + prefix + "'"))
.explanation("Successfully listed contents of bucket '" + bucketName + "', finding " + totalListCount + " objects matching the filter")
.build());
logger.info("Successfully verified configuration");

View File

@ -16,6 +16,8 @@
*/
package org.apache.nifi.processors.aws.s3;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
import com.amazonaws.services.s3.model.GetObjectTaggingRequest;
@ -28,7 +30,10 @@ import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.amazonaws.services.s3.model.S3VersionSummary;
import com.amazonaws.services.s3.model.VersionListing;
import org.apache.commons.lang3.time.DateUtils;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.VerifiableProcessor;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.record.MockRecordWriter;
import org.apache.nifi.state.MockStateManager;
@ -44,6 +49,7 @@ import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
@ -66,6 +72,11 @@ public class TestListS3 {
protected AmazonS3Client getClient() {
return mockS3Client;
}
@Override
protected AmazonS3Client createClient(ProcessContext context, AWSCredentials credentials, ClientConfiguration config) {
return mockS3Client;
}
};
runner = TestRunners.newTestRunner(mockListS3);
}
@ -114,6 +125,10 @@ public class TestListS3 {
flowFiles.get(1).assertAttributeEquals("filename", "b/c");
flowFiles.get(2).assertAttributeEquals("filename", "d/e");
runner.getStateManager().assertStateEquals(ListS3.CURRENT_TIMESTAMP, lastModifiedTimestamp, Scope.CLUSTER);
final List<ConfigVerificationResult> results = ((VerifiableProcessor) runner.getProcessor())
.verify(runner.getProcessContext(), runner.getLogger(), Collections.emptyMap());
assertTrue(results.get(0).getExplanation().contains("finding 3 objects"));
}
@Test