From ddf8c6f8fa6bf808ef407708d5e70529b721468f Mon Sep 17 00:00:00 2001 From: Adam Lamar Date: Sat, 5 Mar 2016 22:22:14 -0700 Subject: [PATCH] NIFI-840: Create ListS3 processor This closes #238 Signed-off-by: Aldrin Piri --- .../processors/aws/s3/DeleteS3Object.java | 2 +- .../nifi/processors/aws/s3/FetchS3Object.java | 2 +- .../apache/nifi/processors/aws/s3/ListS3.java | 235 ++++++++++++++++++ .../nifi/processors/aws/s3/PutS3Object.java | 2 +- .../org.apache.nifi.processor.Processor | 1 + .../nifi/processors/aws/s3/AbstractS3IT.java | 1 + .../nifi/processors/aws/s3/ITListS3.java | 146 +++++++++++ 7 files changed, 386 insertions(+), 3 deletions(-) create mode 100644 nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java create mode 100644 nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITListS3.java diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java index d274f2fa1b..fe7d461c76 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java @@ -40,7 +40,7 @@ import org.apache.nifi.processor.util.StandardValidators; @SupportsBatching -@SeeAlso({PutS3Object.class, FetchS3Object.class}) +@SeeAlso({PutS3Object.class, FetchS3Object.class, ListS3.class}) @Tags({"Amazon", "S3", "AWS", "Archive", "Delete"}) @InputRequirement(Requirement.INPUT_REQUIRED) @CapabilityDescription("Deletes FlowFiles on an Amazon S3 Bucket. " + diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java index 4591a391eb..a86baa197b 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java @@ -46,7 +46,7 @@ import com.amazonaws.services.s3.model.ObjectMetadata; import com.amazonaws.services.s3.model.S3Object; @SupportsBatching -@SeeAlso({PutS3Object.class, DeleteS3Object.class}) +@SeeAlso({PutS3Object.class, DeleteS3Object.class, ListS3.class}) @InputRequirement(Requirement.INPUT_REQUIRED) @Tags({"Amazon", "S3", "AWS", "Get", "Fetch"}) @CapabilityDescription("Retrieves the contents of an S3 Object and writes it to the content of a FlowFile") diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java new file mode 100644 index 0000000000..bc149252bb --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.s3; + +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.ListObjectsRequest; +import com.amazonaws.services.s3.model.ObjectListing; +import com.amazonaws.services.s3.model.S3ObjectSummary; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +@TriggerSerially +@TriggerWhenEmpty +@InputRequirement(Requirement.INPUT_FORBIDDEN) +@Tags({"Amazon", "S3", "AWS", "list"}) +@CapabilityDescription("Retrieves a listing of objects from an S3 bucket. For each object that is listed, creates a FlowFile that represents " + + "the object so that it can be fetched in conjunction with FetchS3Object. This Processor is designed to run on Primary Node only " + + "in a cluster. If the primary node changes, the new Primary Node will pick up where the previous node left off without duplicating " + + "all of the data.") +@Stateful(scopes = Scope.CLUSTER, description = "After performing a listing of keys, the timestamp of the newest key is stored, " + + "along with the keys that share that same timestamp. This allows the Processor to list only keys that have been added or modified after " + + "this date the next time that the Processor is run. State is stored across the cluster so that this Processor can be run on Primary Node only and if a new Primary " + + "Node is selected, the new node can pick up where the previous node left off, without duplicating the data.") +@WritesAttributes({ + @WritesAttribute(attribute = "s3.bucket", description = "The name of the S3 bucket"), + @WritesAttribute(attribute = "filename", description = "The name of the file"), + @WritesAttribute(attribute = "s3.etag", description = "The ETag that can be used to see if the file has changed"), + @WritesAttribute(attribute = "s3.lastModified", description = "The last modified time in milliseconds since epoch in UTC time"), + @WritesAttribute(attribute = "s3.length", description = "The size of the object in bytes"), + @WritesAttribute(attribute = "s3.storeClass", description = "The storage class of the object"),}) +@SeeAlso({FetchS3Object.class, PutS3Object.class, DeleteS3Object.class}) +public class ListS3 extends AbstractS3Processor { + + public static final PropertyDescriptor DELIMITER = new PropertyDescriptor.Builder() + .name("delimiter") + .displayName("Delimiter") + .expressionLanguageSupported(false) + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .description("The string used to delimit directories within the bucket. Please consult the AWS documentation " + + "for the correct use of this field.") + .build(); + + public static final PropertyDescriptor PREFIX = new PropertyDescriptor.Builder() + .name("prefix") + .displayName("Prefix") + .expressionLanguageSupported(true) + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .description("The prefix used to filter the object list. In most cases, it should end with a forward slash ('/').") + .build(); + + public static final List properties = Collections.unmodifiableList( + Arrays.asList(BUCKET, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, + AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE, + PROXY_HOST, PROXY_HOST_PORT, DELIMITER, PREFIX)); + + public static final Set relationships = Collections.unmodifiableSet( + new HashSet<>(Collections.singletonList(REL_SUCCESS))); + + public static final String CURRENT_TIMESTAMP = "currentTimestamp"; + public static final String CURRENT_KEY_PREFIX = "key-"; + + // State tracking + private long currentTimestamp = 0L; + private Set currentKeys; + + @Override + protected List getSupportedPropertyDescriptors() { + return properties; + } + + @Override + public Set getRelationships() { + return relationships; + } + + private Set extractKeys(final StateMap stateMap) { + Set keys = new HashSet<>(); + for (Map.Entry entry : stateMap.toMap().entrySet()) { + if (entry.getKey().startsWith(CURRENT_KEY_PREFIX)) { + keys.add(entry.getValue()); + } + } + return keys; + } + + private void restoreState(final ProcessContext context) throws IOException { + final StateMap stateMap = context.getStateManager().getState(Scope.CLUSTER); + if (stateMap.getVersion() == -1L || stateMap.get(CURRENT_TIMESTAMP) == null || stateMap.get(CURRENT_KEY_PREFIX+"0") == null) { + currentTimestamp = 0L; + currentKeys = new HashSet<>(); + } else { + currentTimestamp = Long.parseLong(stateMap.get(CURRENT_TIMESTAMP)); + currentKeys = extractKeys(stateMap); + } + } + + private void persistState(final ProcessContext context) { + Map state = new HashMap<>(); + state.put(CURRENT_TIMESTAMP, String.valueOf(currentTimestamp)); + int i = 0; + for (String key : currentKeys) { + state.put(CURRENT_KEY_PREFIX+i, key); + i++; + } + try { + context.getStateManager().setState(state, Scope.CLUSTER); + } catch (IOException ioe) { + getLogger().error("Failed to save cluster-wide state. If NiFi is restarted, data duplication may occur", ioe); + } + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) { + try { + restoreState(context); + } catch (IOException ioe) { + getLogger().error("Failed to restore processor state; yielding", ioe); + context.yield(); + return; + } + + final long startNanos = System.nanoTime(); + final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions().getValue(); + + final AmazonS3 client = getClient(); + int listCount = 0; + long maxTimestamp = 0L; + String delimiter = context.getProperty(DELIMITER).getValue(); + String prefix = context.getProperty(PREFIX).evaluateAttributeExpressions().getValue(); + + ListObjectsRequest listObjectsRequest = new ListObjectsRequest().withBucketName(bucket); + if (delimiter != null && !delimiter.isEmpty()) { + listObjectsRequest.setDelimiter(delimiter); + } + if (prefix != null && !prefix.isEmpty()) { + listObjectsRequest.setPrefix(prefix); + } + + ObjectListing objectListing; + do { + objectListing = client.listObjects(listObjectsRequest); + for (S3ObjectSummary objectSummary : objectListing.getObjectSummaries()) { + long lastModified = objectSummary.getLastModified().getTime(); + if (lastModified < currentTimestamp + || lastModified == currentTimestamp && currentKeys.contains(objectSummary.getKey())) { + continue; + } + + // Create the attributes + final Map attributes = new HashMap<>(); + attributes.put(CoreAttributes.FILENAME.key(), objectSummary.getKey()); + attributes.put("s3.bucket", objectSummary.getBucketName()); + if (objectSummary.getOwner() != null) { // We may not have permission to read the owner + attributes.put("s3.owner", objectSummary.getOwner().getId()); + } + attributes.put("s3.etag", objectSummary.getETag()); + attributes.put("s3.lastModified", String.valueOf(lastModified)); + attributes.put("s3.length", String.valueOf(objectSummary.getSize())); + attributes.put("s3.storeClass", objectSummary.getStorageClass()); + + // Create the flowfile + FlowFile flowFile = session.create(); + flowFile = session.putAllAttributes(flowFile, attributes); + session.transfer(flowFile, REL_SUCCESS); + + // Update state + if (lastModified > maxTimestamp) { + maxTimestamp = lastModified; + currentKeys.clear(); + } + if (lastModified == maxTimestamp) { + currentKeys.add(objectSummary.getKey()); + } + listCount++; + } + listObjectsRequest.setMarker(objectListing.getNextMarker()); + } while (objectListing.isTruncated()); + currentTimestamp = maxTimestamp; + + final long listMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); + getLogger().info("Successfully listed S3 bucket {} in {} millis", new Object[]{bucket, listMillis}); + + if (listCount > 0) { + getLogger().info("Successfully listed {} new files from S3; routing to success", new Object[] {listCount}); + session.commit(); + persistState(context); + } else { + getLogger().debug("No new objects in S3 bucket {} to list. Yielding.", new Object[]{bucket}); + context.yield(); + } + } +} diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java index 50d2453167..257b731bf6 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java @@ -80,7 +80,7 @@ import com.amazonaws.services.s3.model.UploadPartRequest; import com.amazonaws.services.s3.model.UploadPartResult; @SupportsBatching -@SeeAlso({FetchS3Object.class, DeleteS3Object.class}) +@SeeAlso({FetchS3Object.class, DeleteS3Object.class, ListS3.class}) @InputRequirement(Requirement.INPUT_REQUIRED) @Tags({"Amazon", "S3", "AWS", "Archive", "Put"}) @CapabilityDescription("Puts FlowFiles to an Amazon S3 Bucket\n" + diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 9eb1c7b599..d8c18fc60d 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -15,6 +15,7 @@ org.apache.nifi.processors.aws.s3.FetchS3Object org.apache.nifi.processors.aws.s3.PutS3Object org.apache.nifi.processors.aws.s3.DeleteS3Object +org.apache.nifi.processors.aws.s3.ListS3 org.apache.nifi.processors.aws.sns.PutSNS org.apache.nifi.processors.aws.sqs.GetSQS org.apache.nifi.processors.aws.sqs.PutSQS diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java index 697e2f2ced..95b4ba8c55 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java @@ -47,6 +47,7 @@ import static org.junit.Assert.fail; * @see ITDeleteS3Object * @see ITFetchS3Object * @see ITPutS3Object + * @see ITListS3 */ public abstract class AbstractS3IT { protected final static String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties"; diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITListS3.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITListS3.java new file mode 100644 index 0000000000..6d77eb6790 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITListS3.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.s3; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.processors.aws.AbstractAWSProcessor; +import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Test; + +import java.io.IOException; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Provides integration level testing with actual AWS S3 resources for {@link ListS3} and requires additional configuration and resources to work. + */ +public class ITListS3 extends AbstractS3IT { + @Test + public void testSimpleList() throws IOException { + putTestFile("a", getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME)); + putTestFile("b/c", getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME)); + putTestFile("d/e", getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME)); + + final TestRunner runner = TestRunners.newTestRunner(new ListS3()); + + runner.setProperty(ListS3.CREDENTIALS_FILE, CREDENTIALS_FILE); + runner.setProperty(ListS3.REGION, REGION); + runner.setProperty(ListS3.BUCKET, BUCKET_NAME); + + runner.run(); + + runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 3); + List flowFiles = runner.getFlowFilesForRelationship(ListS3.REL_SUCCESS); + flowFiles.get(0).assertAttributeEquals("filename", "a"); + flowFiles.get(1).assertAttributeEquals("filename", "b/c"); + flowFiles.get(2).assertAttributeEquals("filename", "d/e"); + } + + @Test + public void testSimpleListUsingCredentialsProviderService() throws Throwable { + putTestFile("a", getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME)); + putTestFile("b/c", getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME)); + putTestFile("d/e", getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME)); + + final TestRunner runner = TestRunners.newTestRunner(new ListS3()); + + final AWSCredentialsProviderControllerService serviceImpl = new AWSCredentialsProviderControllerService(); + + runner.addControllerService("awsCredentialsProvider", serviceImpl); + + runner.setProperty(serviceImpl, AbstractAWSProcessor.CREDENTIALS_FILE, System.getProperty("user.home") + "/aws-credentials.properties"); + runner.enableControllerService(serviceImpl); + runner.assertValid(serviceImpl); + + runner.setProperty(ListS3.AWS_CREDENTIALS_PROVIDER_SERVICE, "awsCredentialsProvider"); + runner.setProperty(ListS3.REGION, REGION); + runner.setProperty(ListS3.BUCKET, BUCKET_NAME); + + runner.run(); + + runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 3); + List flowFiles = runner.getFlowFilesForRelationship(ListS3.REL_SUCCESS); + flowFiles.get(0).assertAttributeEquals("filename", "a"); + flowFiles.get(1).assertAttributeEquals("filename", "b/c"); + flowFiles.get(2).assertAttributeEquals("filename", "d/e"); + } + + @Test + public void testSimpleListWithDelimiter() throws Throwable { + putTestFile("a", getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME)); + putTestFile("b/c", getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME)); + putTestFile("d/e", getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME)); + + final TestRunner runner = TestRunners.newTestRunner(new ListS3()); + + runner.setProperty(ListS3.CREDENTIALS_FILE, CREDENTIALS_FILE); + runner.setProperty(ListS3.REGION, REGION); + runner.setProperty(ListS3.BUCKET, BUCKET_NAME); + runner.setProperty(ListS3.DELIMITER, "/"); + + runner.run(); + + runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 1); + List flowFiles = runner.getFlowFilesForRelationship(ListS3.REL_SUCCESS); + flowFiles.get(0).assertAttributeEquals("filename", "a"); + } + + @Test + public void testSimpleListWithPrefix() throws Throwable { + putTestFile("a", getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME)); + putTestFile("b/c", getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME)); + putTestFile("d/e", getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME)); + + final TestRunner runner = TestRunners.newTestRunner(new ListS3()); + + runner.setProperty(ListS3.CREDENTIALS_FILE, CREDENTIALS_FILE); + runner.setProperty(ListS3.REGION, REGION); + runner.setProperty(ListS3.BUCKET, BUCKET_NAME); + runner.setProperty(ListS3.PREFIX, "b/"); + + runner.run(); + + runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 1); + List flowFiles = runner.getFlowFilesForRelationship(ListS3.REL_SUCCESS); + flowFiles.get(0).assertAttributeEquals("filename", "b/c"); + } + + @Test + public void testGetPropertyDescriptors() throws Exception { + ListS3 processor = new ListS3(); + List pd = processor.getSupportedPropertyDescriptors(); + assertEquals("size should be eq", 13, pd.size()); + assertTrue(pd.contains(ListS3.ACCESS_KEY)); + assertTrue(pd.contains(ListS3.AWS_CREDENTIALS_PROVIDER_SERVICE)); + assertTrue(pd.contains(ListS3.BUCKET)); + assertTrue(pd.contains(ListS3.CREDENTIALS_FILE)); + assertTrue(pd.contains(ListS3.ENDPOINT_OVERRIDE)); + assertTrue(pd.contains(ListS3.REGION)); + assertTrue(pd.contains(ListS3.SECRET_KEY)); + assertTrue(pd.contains(ListS3.SSL_CONTEXT_SERVICE)); + assertTrue(pd.contains(ListS3.TIMEOUT)); + assertTrue(pd.contains(ListS3.PROXY_HOST)); + assertTrue(pd.contains(ListS3.PROXY_HOST_PORT)); + assertTrue(pd.contains(ListS3.DELIMITER)); + assertTrue(pd.contains(ListS3.PREFIX)); + } +}