mirror of https://github.com/apache/nifi.git
NIFI-840: Create ListS3 processor
This closes #238 Signed-off-by: Aldrin Piri <aldrin@apache.org>
This commit is contained in:
parent
46f8693bc6
commit
ddf8c6f8fa
|
@ -40,7 +40,7 @@ import org.apache.nifi.processor.util.StandardValidators;
|
||||||
|
|
||||||
|
|
||||||
@SupportsBatching
|
@SupportsBatching
|
||||||
@SeeAlso({PutS3Object.class, FetchS3Object.class})
|
@SeeAlso({PutS3Object.class, FetchS3Object.class, ListS3.class})
|
||||||
@Tags({"Amazon", "S3", "AWS", "Archive", "Delete"})
|
@Tags({"Amazon", "S3", "AWS", "Archive", "Delete"})
|
||||||
@InputRequirement(Requirement.INPUT_REQUIRED)
|
@InputRequirement(Requirement.INPUT_REQUIRED)
|
||||||
@CapabilityDescription("Deletes FlowFiles on an Amazon S3 Bucket. " +
|
@CapabilityDescription("Deletes FlowFiles on an Amazon S3 Bucket. " +
|
||||||
|
|
|
@ -46,7 +46,7 @@ import com.amazonaws.services.s3.model.ObjectMetadata;
|
||||||
import com.amazonaws.services.s3.model.S3Object;
|
import com.amazonaws.services.s3.model.S3Object;
|
||||||
|
|
||||||
@SupportsBatching
|
@SupportsBatching
|
||||||
@SeeAlso({PutS3Object.class, DeleteS3Object.class})
|
@SeeAlso({PutS3Object.class, DeleteS3Object.class, ListS3.class})
|
||||||
@InputRequirement(Requirement.INPUT_REQUIRED)
|
@InputRequirement(Requirement.INPUT_REQUIRED)
|
||||||
@Tags({"Amazon", "S3", "AWS", "Get", "Fetch"})
|
@Tags({"Amazon", "S3", "AWS", "Get", "Fetch"})
|
||||||
@CapabilityDescription("Retrieves the contents of an S3 Object and writes it to the content of a FlowFile")
|
@CapabilityDescription("Retrieves the contents of an S3 Object and writes it to the content of a FlowFile")
|
||||||
|
|
|
@ -0,0 +1,235 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.nifi.processors.aws.s3;
|
||||||
|
|
||||||
|
import com.amazonaws.services.s3.AmazonS3;
|
||||||
|
import com.amazonaws.services.s3.model.ListObjectsRequest;
|
||||||
|
import com.amazonaws.services.s3.model.ObjectListing;
|
||||||
|
import com.amazonaws.services.s3.model.S3ObjectSummary;
|
||||||
|
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||||
|
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
|
||||||
|
import org.apache.nifi.annotation.behavior.Stateful;
|
||||||
|
import org.apache.nifi.annotation.behavior.TriggerSerially;
|
||||||
|
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
|
||||||
|
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||||
|
import org.apache.nifi.annotation.behavior.WritesAttributes;
|
||||||
|
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||||
|
import org.apache.nifi.annotation.documentation.SeeAlso;
|
||||||
|
import org.apache.nifi.annotation.documentation.Tags;
|
||||||
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
|
import org.apache.nifi.components.state.Scope;
|
||||||
|
import org.apache.nifi.components.state.StateMap;
|
||||||
|
import org.apache.nifi.flowfile.FlowFile;
|
||||||
|
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||||
|
import org.apache.nifi.processor.ProcessContext;
|
||||||
|
import org.apache.nifi.processor.ProcessSession;
|
||||||
|
import org.apache.nifi.processor.Relationship;
|
||||||
|
import org.apache.nifi.processor.util.StandardValidators;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
@TriggerSerially
|
||||||
|
@TriggerWhenEmpty
|
||||||
|
@InputRequirement(Requirement.INPUT_FORBIDDEN)
|
||||||
|
@Tags({"Amazon", "S3", "AWS", "list"})
|
||||||
|
@CapabilityDescription("Retrieves a listing of objects from an S3 bucket. For each object that is listed, creates a FlowFile that represents "
|
||||||
|
+ "the object so that it can be fetched in conjunction with FetchS3Object. This Processor is designed to run on Primary Node only "
|
||||||
|
+ "in a cluster. If the primary node changes, the new Primary Node will pick up where the previous node left off without duplicating "
|
||||||
|
+ "all of the data.")
|
||||||
|
@Stateful(scopes = Scope.CLUSTER, description = "After performing a listing of keys, the timestamp of the newest key is stored, "
|
||||||
|
+ "along with the keys that share that same timestamp. This allows the Processor to list only keys that have been added or modified after "
|
||||||
|
+ "this date the next time that the Processor is run. State is stored across the cluster so that this Processor can be run on Primary Node only and if a new Primary "
|
||||||
|
+ "Node is selected, the new node can pick up where the previous node left off, without duplicating the data.")
|
||||||
|
@WritesAttributes({
|
||||||
|
@WritesAttribute(attribute = "s3.bucket", description = "The name of the S3 bucket"),
|
||||||
|
@WritesAttribute(attribute = "filename", description = "The name of the file"),
|
||||||
|
@WritesAttribute(attribute = "s3.etag", description = "The ETag that can be used to see if the file has changed"),
|
||||||
|
@WritesAttribute(attribute = "s3.lastModified", description = "The last modified time in milliseconds since epoch in UTC time"),
|
||||||
|
@WritesAttribute(attribute = "s3.length", description = "The size of the object in bytes"),
|
||||||
|
@WritesAttribute(attribute = "s3.storeClass", description = "The storage class of the object"),})
|
||||||
|
@SeeAlso({FetchS3Object.class, PutS3Object.class, DeleteS3Object.class})
|
||||||
|
public class ListS3 extends AbstractS3Processor {
|
||||||
|
|
||||||
|
public static final PropertyDescriptor DELIMITER = new PropertyDescriptor.Builder()
|
||||||
|
.name("delimiter")
|
||||||
|
.displayName("Delimiter")
|
||||||
|
.expressionLanguageSupported(false)
|
||||||
|
.required(false)
|
||||||
|
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
|
.description("The string used to delimit directories within the bucket. Please consult the AWS documentation " +
|
||||||
|
"for the correct use of this field.")
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor PREFIX = new PropertyDescriptor.Builder()
|
||||||
|
.name("prefix")
|
||||||
|
.displayName("Prefix")
|
||||||
|
.expressionLanguageSupported(true)
|
||||||
|
.required(false)
|
||||||
|
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
|
.description("The prefix used to filter the object list. In most cases, it should end with a forward slash ('/').")
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
|
||||||
|
Arrays.asList(BUCKET, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE,
|
||||||
|
AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE,
|
||||||
|
PROXY_HOST, PROXY_HOST_PORT, DELIMITER, PREFIX));
|
||||||
|
|
||||||
|
public static final Set<Relationship> relationships = Collections.unmodifiableSet(
|
||||||
|
new HashSet<>(Collections.singletonList(REL_SUCCESS)));
|
||||||
|
|
||||||
|
public static final String CURRENT_TIMESTAMP = "currentTimestamp";
|
||||||
|
public static final String CURRENT_KEY_PREFIX = "key-";
|
||||||
|
|
||||||
|
// State tracking
|
||||||
|
private long currentTimestamp = 0L;
|
||||||
|
private Set<String> currentKeys;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||||
|
return properties;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Set<Relationship> getRelationships() {
|
||||||
|
return relationships;
|
||||||
|
}
|
||||||
|
|
||||||
|
private Set<String> extractKeys(final StateMap stateMap) {
|
||||||
|
Set<String> keys = new HashSet<>();
|
||||||
|
for (Map.Entry<String, String> entry : stateMap.toMap().entrySet()) {
|
||||||
|
if (entry.getKey().startsWith(CURRENT_KEY_PREFIX)) {
|
||||||
|
keys.add(entry.getValue());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return keys;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void restoreState(final ProcessContext context) throws IOException {
|
||||||
|
final StateMap stateMap = context.getStateManager().getState(Scope.CLUSTER);
|
||||||
|
if (stateMap.getVersion() == -1L || stateMap.get(CURRENT_TIMESTAMP) == null || stateMap.get(CURRENT_KEY_PREFIX+"0") == null) {
|
||||||
|
currentTimestamp = 0L;
|
||||||
|
currentKeys = new HashSet<>();
|
||||||
|
} else {
|
||||||
|
currentTimestamp = Long.parseLong(stateMap.get(CURRENT_TIMESTAMP));
|
||||||
|
currentKeys = extractKeys(stateMap);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void persistState(final ProcessContext context) {
|
||||||
|
Map<String, String> state = new HashMap<>();
|
||||||
|
state.put(CURRENT_TIMESTAMP, String.valueOf(currentTimestamp));
|
||||||
|
int i = 0;
|
||||||
|
for (String key : currentKeys) {
|
||||||
|
state.put(CURRENT_KEY_PREFIX+i, key);
|
||||||
|
i++;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
context.getStateManager().setState(state, Scope.CLUSTER);
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
getLogger().error("Failed to save cluster-wide state. If NiFi is restarted, data duplication may occur", ioe);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onTrigger(final ProcessContext context, final ProcessSession session) {
|
||||||
|
try {
|
||||||
|
restoreState(context);
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
getLogger().error("Failed to restore processor state; yielding", ioe);
|
||||||
|
context.yield();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
final long startNanos = System.nanoTime();
|
||||||
|
final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions().getValue();
|
||||||
|
|
||||||
|
final AmazonS3 client = getClient();
|
||||||
|
int listCount = 0;
|
||||||
|
long maxTimestamp = 0L;
|
||||||
|
String delimiter = context.getProperty(DELIMITER).getValue();
|
||||||
|
String prefix = context.getProperty(PREFIX).evaluateAttributeExpressions().getValue();
|
||||||
|
|
||||||
|
ListObjectsRequest listObjectsRequest = new ListObjectsRequest().withBucketName(bucket);
|
||||||
|
if (delimiter != null && !delimiter.isEmpty()) {
|
||||||
|
listObjectsRequest.setDelimiter(delimiter);
|
||||||
|
}
|
||||||
|
if (prefix != null && !prefix.isEmpty()) {
|
||||||
|
listObjectsRequest.setPrefix(prefix);
|
||||||
|
}
|
||||||
|
|
||||||
|
ObjectListing objectListing;
|
||||||
|
do {
|
||||||
|
objectListing = client.listObjects(listObjectsRequest);
|
||||||
|
for (S3ObjectSummary objectSummary : objectListing.getObjectSummaries()) {
|
||||||
|
long lastModified = objectSummary.getLastModified().getTime();
|
||||||
|
if (lastModified < currentTimestamp
|
||||||
|
|| lastModified == currentTimestamp && currentKeys.contains(objectSummary.getKey())) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create the attributes
|
||||||
|
final Map<String, String> attributes = new HashMap<>();
|
||||||
|
attributes.put(CoreAttributes.FILENAME.key(), objectSummary.getKey());
|
||||||
|
attributes.put("s3.bucket", objectSummary.getBucketName());
|
||||||
|
if (objectSummary.getOwner() != null) { // We may not have permission to read the owner
|
||||||
|
attributes.put("s3.owner", objectSummary.getOwner().getId());
|
||||||
|
}
|
||||||
|
attributes.put("s3.etag", objectSummary.getETag());
|
||||||
|
attributes.put("s3.lastModified", String.valueOf(lastModified));
|
||||||
|
attributes.put("s3.length", String.valueOf(objectSummary.getSize()));
|
||||||
|
attributes.put("s3.storeClass", objectSummary.getStorageClass());
|
||||||
|
|
||||||
|
// Create the flowfile
|
||||||
|
FlowFile flowFile = session.create();
|
||||||
|
flowFile = session.putAllAttributes(flowFile, attributes);
|
||||||
|
session.transfer(flowFile, REL_SUCCESS);
|
||||||
|
|
||||||
|
// Update state
|
||||||
|
if (lastModified > maxTimestamp) {
|
||||||
|
maxTimestamp = lastModified;
|
||||||
|
currentKeys.clear();
|
||||||
|
}
|
||||||
|
if (lastModified == maxTimestamp) {
|
||||||
|
currentKeys.add(objectSummary.getKey());
|
||||||
|
}
|
||||||
|
listCount++;
|
||||||
|
}
|
||||||
|
listObjectsRequest.setMarker(objectListing.getNextMarker());
|
||||||
|
} while (objectListing.isTruncated());
|
||||||
|
currentTimestamp = maxTimestamp;
|
||||||
|
|
||||||
|
final long listMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
|
||||||
|
getLogger().info("Successfully listed S3 bucket {} in {} millis", new Object[]{bucket, listMillis});
|
||||||
|
|
||||||
|
if (listCount > 0) {
|
||||||
|
getLogger().info("Successfully listed {} new files from S3; routing to success", new Object[] {listCount});
|
||||||
|
session.commit();
|
||||||
|
persistState(context);
|
||||||
|
} else {
|
||||||
|
getLogger().debug("No new objects in S3 bucket {} to list. Yielding.", new Object[]{bucket});
|
||||||
|
context.yield();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -80,7 +80,7 @@ import com.amazonaws.services.s3.model.UploadPartRequest;
|
||||||
import com.amazonaws.services.s3.model.UploadPartResult;
|
import com.amazonaws.services.s3.model.UploadPartResult;
|
||||||
|
|
||||||
@SupportsBatching
|
@SupportsBatching
|
||||||
@SeeAlso({FetchS3Object.class, DeleteS3Object.class})
|
@SeeAlso({FetchS3Object.class, DeleteS3Object.class, ListS3.class})
|
||||||
@InputRequirement(Requirement.INPUT_REQUIRED)
|
@InputRequirement(Requirement.INPUT_REQUIRED)
|
||||||
@Tags({"Amazon", "S3", "AWS", "Archive", "Put"})
|
@Tags({"Amazon", "S3", "AWS", "Archive", "Put"})
|
||||||
@CapabilityDescription("Puts FlowFiles to an Amazon S3 Bucket\n" +
|
@CapabilityDescription("Puts FlowFiles to an Amazon S3 Bucket\n" +
|
||||||
|
|
|
@ -15,6 +15,7 @@
|
||||||
org.apache.nifi.processors.aws.s3.FetchS3Object
|
org.apache.nifi.processors.aws.s3.FetchS3Object
|
||||||
org.apache.nifi.processors.aws.s3.PutS3Object
|
org.apache.nifi.processors.aws.s3.PutS3Object
|
||||||
org.apache.nifi.processors.aws.s3.DeleteS3Object
|
org.apache.nifi.processors.aws.s3.DeleteS3Object
|
||||||
|
org.apache.nifi.processors.aws.s3.ListS3
|
||||||
org.apache.nifi.processors.aws.sns.PutSNS
|
org.apache.nifi.processors.aws.sns.PutSNS
|
||||||
org.apache.nifi.processors.aws.sqs.GetSQS
|
org.apache.nifi.processors.aws.sqs.GetSQS
|
||||||
org.apache.nifi.processors.aws.sqs.PutSQS
|
org.apache.nifi.processors.aws.sqs.PutSQS
|
||||||
|
|
|
@ -47,6 +47,7 @@ import static org.junit.Assert.fail;
|
||||||
* @see ITDeleteS3Object
|
* @see ITDeleteS3Object
|
||||||
* @see ITFetchS3Object
|
* @see ITFetchS3Object
|
||||||
* @see ITPutS3Object
|
* @see ITPutS3Object
|
||||||
|
* @see ITListS3
|
||||||
*/
|
*/
|
||||||
public abstract class AbstractS3IT {
|
public abstract class AbstractS3IT {
|
||||||
protected final static String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties";
|
protected final static String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties";
|
||||||
|
|
|
@ -0,0 +1,146 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.nifi.processors.aws.s3;
|
||||||
|
|
||||||
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
|
import org.apache.nifi.processors.aws.AbstractAWSProcessor;
|
||||||
|
import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService;
|
||||||
|
import org.apache.nifi.util.MockFlowFile;
|
||||||
|
import org.apache.nifi.util.TestRunner;
|
||||||
|
import org.apache.nifi.util.TestRunners;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Provides integration level testing with actual AWS S3 resources for {@link ListS3} and requires additional configuration and resources to work.
|
||||||
|
*/
|
||||||
|
public class ITListS3 extends AbstractS3IT {
|
||||||
|
@Test
|
||||||
|
public void testSimpleList() throws IOException {
|
||||||
|
putTestFile("a", getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME));
|
||||||
|
putTestFile("b/c", getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME));
|
||||||
|
putTestFile("d/e", getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME));
|
||||||
|
|
||||||
|
final TestRunner runner = TestRunners.newTestRunner(new ListS3());
|
||||||
|
|
||||||
|
runner.setProperty(ListS3.CREDENTIALS_FILE, CREDENTIALS_FILE);
|
||||||
|
runner.setProperty(ListS3.REGION, REGION);
|
||||||
|
runner.setProperty(ListS3.BUCKET, BUCKET_NAME);
|
||||||
|
|
||||||
|
runner.run();
|
||||||
|
|
||||||
|
runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 3);
|
||||||
|
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ListS3.REL_SUCCESS);
|
||||||
|
flowFiles.get(0).assertAttributeEquals("filename", "a");
|
||||||
|
flowFiles.get(1).assertAttributeEquals("filename", "b/c");
|
||||||
|
flowFiles.get(2).assertAttributeEquals("filename", "d/e");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSimpleListUsingCredentialsProviderService() throws Throwable {
|
||||||
|
putTestFile("a", getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME));
|
||||||
|
putTestFile("b/c", getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME));
|
||||||
|
putTestFile("d/e", getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME));
|
||||||
|
|
||||||
|
final TestRunner runner = TestRunners.newTestRunner(new ListS3());
|
||||||
|
|
||||||
|
final AWSCredentialsProviderControllerService serviceImpl = new AWSCredentialsProviderControllerService();
|
||||||
|
|
||||||
|
runner.addControllerService("awsCredentialsProvider", serviceImpl);
|
||||||
|
|
||||||
|
runner.setProperty(serviceImpl, AbstractAWSProcessor.CREDENTIALS_FILE, System.getProperty("user.home") + "/aws-credentials.properties");
|
||||||
|
runner.enableControllerService(serviceImpl);
|
||||||
|
runner.assertValid(serviceImpl);
|
||||||
|
|
||||||
|
runner.setProperty(ListS3.AWS_CREDENTIALS_PROVIDER_SERVICE, "awsCredentialsProvider");
|
||||||
|
runner.setProperty(ListS3.REGION, REGION);
|
||||||
|
runner.setProperty(ListS3.BUCKET, BUCKET_NAME);
|
||||||
|
|
||||||
|
runner.run();
|
||||||
|
|
||||||
|
runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 3);
|
||||||
|
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ListS3.REL_SUCCESS);
|
||||||
|
flowFiles.get(0).assertAttributeEquals("filename", "a");
|
||||||
|
flowFiles.get(1).assertAttributeEquals("filename", "b/c");
|
||||||
|
flowFiles.get(2).assertAttributeEquals("filename", "d/e");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSimpleListWithDelimiter() throws Throwable {
|
||||||
|
putTestFile("a", getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME));
|
||||||
|
putTestFile("b/c", getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME));
|
||||||
|
putTestFile("d/e", getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME));
|
||||||
|
|
||||||
|
final TestRunner runner = TestRunners.newTestRunner(new ListS3());
|
||||||
|
|
||||||
|
runner.setProperty(ListS3.CREDENTIALS_FILE, CREDENTIALS_FILE);
|
||||||
|
runner.setProperty(ListS3.REGION, REGION);
|
||||||
|
runner.setProperty(ListS3.BUCKET, BUCKET_NAME);
|
||||||
|
runner.setProperty(ListS3.DELIMITER, "/");
|
||||||
|
|
||||||
|
runner.run();
|
||||||
|
|
||||||
|
runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 1);
|
||||||
|
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ListS3.REL_SUCCESS);
|
||||||
|
flowFiles.get(0).assertAttributeEquals("filename", "a");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSimpleListWithPrefix() throws Throwable {
|
||||||
|
putTestFile("a", getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME));
|
||||||
|
putTestFile("b/c", getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME));
|
||||||
|
putTestFile("d/e", getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME));
|
||||||
|
|
||||||
|
final TestRunner runner = TestRunners.newTestRunner(new ListS3());
|
||||||
|
|
||||||
|
runner.setProperty(ListS3.CREDENTIALS_FILE, CREDENTIALS_FILE);
|
||||||
|
runner.setProperty(ListS3.REGION, REGION);
|
||||||
|
runner.setProperty(ListS3.BUCKET, BUCKET_NAME);
|
||||||
|
runner.setProperty(ListS3.PREFIX, "b/");
|
||||||
|
|
||||||
|
runner.run();
|
||||||
|
|
||||||
|
runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 1);
|
||||||
|
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ListS3.REL_SUCCESS);
|
||||||
|
flowFiles.get(0).assertAttributeEquals("filename", "b/c");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetPropertyDescriptors() throws Exception {
|
||||||
|
ListS3 processor = new ListS3();
|
||||||
|
List<PropertyDescriptor> pd = processor.getSupportedPropertyDescriptors();
|
||||||
|
assertEquals("size should be eq", 13, pd.size());
|
||||||
|
assertTrue(pd.contains(ListS3.ACCESS_KEY));
|
||||||
|
assertTrue(pd.contains(ListS3.AWS_CREDENTIALS_PROVIDER_SERVICE));
|
||||||
|
assertTrue(pd.contains(ListS3.BUCKET));
|
||||||
|
assertTrue(pd.contains(ListS3.CREDENTIALS_FILE));
|
||||||
|
assertTrue(pd.contains(ListS3.ENDPOINT_OVERRIDE));
|
||||||
|
assertTrue(pd.contains(ListS3.REGION));
|
||||||
|
assertTrue(pd.contains(ListS3.SECRET_KEY));
|
||||||
|
assertTrue(pd.contains(ListS3.SSL_CONTEXT_SERVICE));
|
||||||
|
assertTrue(pd.contains(ListS3.TIMEOUT));
|
||||||
|
assertTrue(pd.contains(ListS3.PROXY_HOST));
|
||||||
|
assertTrue(pd.contains(ListS3.PROXY_HOST_PORT));
|
||||||
|
assertTrue(pd.contains(ListS3.DELIMITER));
|
||||||
|
assertTrue(pd.contains(ListS3.PREFIX));
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue