NIFI-13430 Added CopyS3Object and GetS3ObjectMetadata

This closes #8992

Co-authored-by: David Handermann <exceptionfactory@apache.org>
Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Mike Thomsen 2024-06-20 09:32:29 -04:00 committed by exceptionfactory
parent 89c757999b
commit 99f3717f13
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
10 changed files with 682 additions and 5 deletions

View File

@ -0,0 +1,149 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.aws.s3;
import com.amazonaws.AmazonClientException;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.AccessControlList;
import com.amazonaws.services.s3.model.CannedAccessControlList;
import com.amazonaws.services.s3.model.CopyObjectRequest;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import java.util.List;
import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION;
@Tags({"Amazon", "S3", "AWS", "Archive", "Copy"})
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@CapabilityDescription("Copies a file from one bucket and key to another in AWS S3")
@SeeAlso({PutS3Object.class, DeleteS3Object.class, ListS3.class, TagS3Object.class, DeleteS3Object.class, FetchS3Object.class})
public class CopyS3Object extends AbstractS3Processor {
static final PropertyDescriptor SOURCE_BUCKET = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(BUCKET_WITH_DEFAULT_VALUE)
.name("Source Bucket")
.displayName("Source Bucket")
.description("The bucket that contains the file to be copied.")
.build();
static final PropertyDescriptor SOURCE_KEY = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(KEY)
.name("Source Key")
.displayName("Source Key")
.description("The source key in the source bucket")
.build();
static final PropertyDescriptor DESTINATION_BUCKET = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(BUCKET_WITHOUT_DEFAULT_VALUE)
.name("Destination Bucket")
.displayName("Destination Bucket")
.description("The bucket that will receive the copy.")
.build();
static final PropertyDescriptor DESTINATION_KEY = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(KEY)
.name("Destination Key")
.displayName("Destination Key")
.description("The target key in the target bucket")
.defaultValue("${filename}-1")
.build();
static final List<PropertyDescriptor> properties = List.of(
SOURCE_BUCKET,
SOURCE_KEY,
DESTINATION_BUCKET,
DESTINATION_KEY,
AWS_CREDENTIALS_PROVIDER_SERVICE,
S3_REGION,
TIMEOUT,
FULL_CONTROL_USER_LIST,
READ_USER_LIST,
WRITE_USER_LIST,
READ_ACL_LIST,
WRITE_ACL_LIST,
CANNED_ACL,
OWNER,
SSL_CONTEXT_SERVICE,
ENDPOINT_OVERRIDE,
SIGNER_OVERRIDE,
S3_CUSTOM_SIGNER_CLASS_NAME,
S3_CUSTOM_SIGNER_MODULE_LOCATION,
PROXY_CONFIGURATION_SERVICE
);
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final AmazonS3Client s3;
try {
s3 = getS3Client(context, flowFile.getAttributes());
} catch (Exception e) {
getLogger().error("Failed to initialize S3 client", e);
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
return;
}
final String sourceBucket = context.getProperty(SOURCE_BUCKET).evaluateAttributeExpressions(flowFile).getValue();
final String sourceKey = context.getProperty(SOURCE_KEY).evaluateAttributeExpressions(flowFile).getValue();
final String destinationBucket = context.getProperty(DESTINATION_BUCKET).evaluateAttributeExpressions(flowFile).getValue();
final String destinationKey = context.getProperty(DESTINATION_KEY).evaluateAttributeExpressions(flowFile).getValue();
try {
final CopyObjectRequest request = new CopyObjectRequest(sourceBucket, sourceKey, destinationBucket, destinationKey);
final AccessControlList acl = createACL(context, flowFile);
if (acl != null) {
request.setAccessControlList(acl);
}
final CannedAccessControlList cannedAccessControlList = createCannedACL(context, flowFile);
if (cannedAccessControlList != null) {
request.setCannedAccessControlList(cannedAccessControlList);
}
s3.copyObject(request);
session.getProvenanceReporter().send(flowFile, getTransitUrl(destinationBucket, destinationKey));
session.transfer(flowFile, REL_SUCCESS);
} catch (final AmazonClientException e) {
flowFile = extractExceptionDetails(e, session, flowFile);
getLogger().error("Failed to copy S3 object from Bucket [{}] Key [{}]", sourceBucket, sourceKey, e);
session.transfer(flowFile, REL_FAILURE);
}
}
private String getTransitUrl(final String destinationBucket, final String destinationKey) {
final String spacer = destinationKey.startsWith("/") ? "" : "/";
return String.format("s3://%s%s%s", destinationBucket, spacer, destinationKey);
}
}

View File

@ -48,7 +48,7 @@ import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION;
@WritesAttribute(attribute = "s3.statusCode", description = "The HTTP error code (if available) from the failed operation"),
@WritesAttribute(attribute = "s3.errorCode", description = "The S3 moniker of the failed operation"),
@WritesAttribute(attribute = "s3.errorMessage", description = "The S3 exception message from the failed operation")})
@SeeAlso({PutS3Object.class, FetchS3Object.class, ListS3.class})
@SeeAlso({PutS3Object.class, FetchS3Object.class, ListS3.class, CopyS3Object.class, GetS3ObjectMetadata.class, TagS3Object.class})
@Tags({"Amazon", "S3", "AWS", "Archive", "Delete"})
@InputRequirement(Requirement.INPUT_REQUIRED)
@CapabilityDescription("Deletes a file from an Amazon S3 Bucket. If attempting to delete a file that does not exist, FlowFile is routed to success.")

View File

@ -65,7 +65,7 @@ import java.util.concurrent.TimeUnit;
import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION;
@SupportsBatching
@SeeAlso({PutS3Object.class, DeleteS3Object.class, ListS3.class})
@SeeAlso({PutS3Object.class, DeleteS3Object.class, ListS3.class, CopyS3Object.class, GetS3ObjectMetadata.class, TagS3Object.class})
@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"Amazon", "S3", "AWS", "Get", "Fetch"})
@CapabilityDescription("Retrieves the contents of an S3 Object and writes it to the content of a FlowFile")

View File

@ -0,0 +1,216 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.aws.s3;
import com.amazonaws.AmazonClientException;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION;
@Tags({"Amazon", "S3", "AWS", "Archive", "Exists"})
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@CapabilityDescription("Check for the existence of a file in S3 without attempting to download it. This processor can be " +
"used as a router for work flows that need to check on a file in S3 before proceeding with data processing")
@SeeAlso({PutS3Object.class, DeleteS3Object.class, ListS3.class, TagS3Object.class, DeleteS3Object.class, FetchS3Object.class})
public class GetS3ObjectMetadata extends AbstractS3Processor {
static final AllowableValue TARGET_ATTRIBUTES = new AllowableValue("ATTRIBUTES", "Attributes", """
When selected, the metadata will be written to FlowFile attributes with the prefix "s3." following the convention used in other processors. For example:
the standard S3 attribute Content-Type will be written as s3.Content-Type when using the default value. User-defined metadata
will be included in the attributes added to the FlowFile
"""
);
static final AllowableValue TARGET_FLOWFILE_BODY = new AllowableValue("FLOWFILE_BODY", "FlowFile Body", "Write the metadata to FlowFile content as JSON data.");
static final PropertyDescriptor METADATA_TARGET = new PropertyDescriptor.Builder()
.name("Metadata Target")
.description("This determines where the metadata will be written when found.")
.addValidator(Validator.VALID)
.required(true)
.allowableValues(TARGET_ATTRIBUTES, TARGET_FLOWFILE_BODY)
.defaultValue(TARGET_ATTRIBUTES)
.build();
static final PropertyDescriptor ATTRIBUTE_INCLUDE_PATTERN = new PropertyDescriptor.Builder()
.name("Metadata Attribute Include Pattern")
.description("""
A regular expression pattern to use for determining which object metadata entries are included as FlowFile
attributes. This pattern is only applied to the 'found' relationship and will not be used to
filter the error attributes in the 'failure' relationship.
"""
)
.addValidator(Validator.VALID)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.defaultValue(".*")
.dependsOn(METADATA_TARGET, TARGET_ATTRIBUTES)
.build();
private static final List<PropertyDescriptor> properties = List.of(
METADATA_TARGET,
ATTRIBUTE_INCLUDE_PATTERN,
BUCKET_WITH_DEFAULT_VALUE,
KEY,
AWS_CREDENTIALS_PROVIDER_SERVICE,
S3_REGION,
TIMEOUT,
FULL_CONTROL_USER_LIST,
READ_USER_LIST,
READ_ACL_LIST,
OWNER,
SSL_CONTEXT_SERVICE,
ENDPOINT_OVERRIDE,
SIGNER_OVERRIDE,
S3_CUSTOM_SIGNER_CLASS_NAME,
S3_CUSTOM_SIGNER_MODULE_LOCATION,
PROXY_CONFIGURATION_SERVICE
);
static Relationship REL_FOUND = new Relationship.Builder()
.name("found")
.description("An object was found in the bucket at the supplied key")
.build();
static Relationship REL_NOT_FOUND = new Relationship.Builder()
.name("not found")
.description("No object was found in the bucket the supplied key")
.build();
private static final Set<Relationship> relationships = Set.of(REL_FOUND, REL_NOT_FOUND, REL_FAILURE);
private static final String ATTRIBUTE_FORMAT = "s3.%s";
private static final ObjectMapper MAPPER = new ObjectMapper();
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final AmazonS3Client s3;
try {
s3 = getS3Client(context, flowFile.getAttributes());
} catch (Exception e) {
getLogger().error("Failed to initialize S3 client", e);
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
return;
}
final String bucket = context.getProperty(BUCKET_WITH_DEFAULT_VALUE).evaluateAttributeExpressions(flowFile).getValue();
final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
final Pattern attributePattern;
final PropertyValue attributeIncludePatternProperty = context.getProperty(ATTRIBUTE_INCLUDE_PATTERN).evaluateAttributeExpressions(flowFile);
if (attributeIncludePatternProperty.isSet()) {
attributePattern = Pattern.compile(attributeIncludePatternProperty.getValue());
} else {
attributePattern = null;
}
final String metadataTarget = context.getProperty(METADATA_TARGET).getValue();
try {
Relationship relationship;
try {
final ObjectMetadata objectMetadata = s3.getObjectMetadata(bucket, key);
final Map<String, Object> combinedMetadata = new LinkedHashMap<>(objectMetadata.getRawMetadata());
combinedMetadata.putAll(objectMetadata.getUserMetadata());
if (TARGET_ATTRIBUTES.getValue().equals(metadataTarget)) {
final Map<String, String> newAttributes = combinedMetadata
.entrySet().stream()
.filter(e -> {
if (attributePattern == null) {
return true;
} else {
return attributePattern.matcher(e.getKey()).find();
}
})
.collect(Collectors.toMap(e -> ATTRIBUTE_FORMAT.formatted(e.getKey()), e -> {
final Object value = e.getValue();
final String attributeValue;
if (value instanceof Date dateValue) {
attributeValue = Long.toString(dateValue.getTime());
} else {
attributeValue = value.toString();
}
return attributeValue;
}));
flowFile = session.putAllAttributes(flowFile, newAttributes);
} else if (TARGET_FLOWFILE_BODY.getValue().equals(metadataTarget)) {
flowFile = session.write(flowFile, outputStream -> MAPPER.writeValue(outputStream, combinedMetadata));
}
relationship = REL_FOUND;
} catch (final AmazonS3Exception e) {
if (e.getStatusCode() == 404) {
relationship = REL_NOT_FOUND;
flowFile = extractExceptionDetails(e, session, flowFile);
} else {
throw e;
}
}
session.transfer(flowFile, relationship);
} catch (final AmazonClientException e) {
getLogger().error("Failed to get S3 Object Metadata from Bucket [{}] Key [{}]", bucket, key, e);
flowFile = extractExceptionDetails(e, session, flowFile);
session.transfer(flowFile, REL_FAILURE);
}
}
}

View File

@ -106,7 +106,7 @@ import static org.apache.nifi.processors.aws.util.RegionUtilV1.REGION;
@TriggerWhenEmpty
@InputRequirement(Requirement.INPUT_FORBIDDEN)
@Tags({"Amazon", "S3", "AWS", "list"})
@SeeAlso({FetchS3Object.class, PutS3Object.class, DeleteS3Object.class})
@SeeAlso({FetchS3Object.class, PutS3Object.class, DeleteS3Object.class, CopyS3Object.class, GetS3ObjectMetadata.class, TagS3Object.class})
@CapabilityDescription("Retrieves a listing of objects from an S3 bucket. For each object that is listed, creates a FlowFile that represents "
+ "the object so that it can be fetched in conjunction with FetchS3Object. This Processor is designed to run on Primary Node only "
+ "in a cluster. If the primary node changes, the new Primary Node will pick up where the previous node left off without duplicating "

View File

@ -95,7 +95,7 @@ import static org.apache.nifi.processors.transfer.ResourceTransferProperties.RES
import static org.apache.nifi.processors.transfer.ResourceTransferUtils.getFileResource;
@SupportsBatching
@SeeAlso({FetchS3Object.class, DeleteS3Object.class, ListS3.class})
@SeeAlso({FetchS3Object.class, DeleteS3Object.class, ListS3.class, CopyS3Object.class, GetS3ObjectMetadata.class, TagS3Object.class})
@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"Amazon", "S3", "AWS", "Archive", "Put"})
@CapabilityDescription("Writes the contents of a FlowFile as an S3 Object to an Amazon S3 Bucket.")

View File

@ -59,7 +59,7 @@ import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION;
@WritesAttribute(attribute = "s3.statusCode", description = "The HTTP error code (if available) from the failed operation"),
@WritesAttribute(attribute = "s3.errorCode", description = "The S3 moniker of the failed operation"),
@WritesAttribute(attribute = "s3.errorMessage", description = "The S3 exception message from the failed operation")})
@SeeAlso({PutS3Object.class, FetchS3Object.class, ListS3.class})
@SeeAlso({PutS3Object.class, FetchS3Object.class, ListS3.class, CopyS3Object.class, GetS3ObjectMetadata.class, DeleteS3Object.class})
@Tags({"Amazon", "S3", "AWS", "Archive", "Tag"})
@InputRequirement(Requirement.INPUT_REQUIRED)
@CapabilityDescription("Adds or updates a tag on an Amazon S3 Object.")

View File

@ -12,6 +12,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.processors.aws.s3.CopyS3Object
org.apache.nifi.processors.aws.s3.GetS3ObjectMetadata
org.apache.nifi.processors.aws.s3.FetchS3Object
org.apache.nifi.processors.aws.s3.PutS3Object
org.apache.nifi.processors.aws.s3.DeleteS3Object

View File

@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.aws.s3;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.regions.Region;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.CopyObjectRequest;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.aws.testutil.AuthUtils;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class TestCopyS3Object {
private TestRunner runner;
private AmazonS3Client mockS3Client;
@BeforeEach
void setUp() {
mockS3Client = mock(AmazonS3Client.class);
final CopyS3Object mockCopyS3Object = new CopyS3Object() {
@Override
protected AmazonS3Client createClient(final ProcessContext context, final AWSCredentialsProvider credentialsProvider, final Region region, final ClientConfiguration config,
final AwsClientBuilder.EndpointConfiguration endpointConfiguration) {
return mockS3Client;
}
};
runner = TestRunners.newTestRunner(mockCopyS3Object);
AuthUtils.enableAccessKey(runner, "accessKeyId", "secretKey");
}
@DisplayName("Test a normal run that SHOULD succeed")
@Test
void testRun() {
runner.enqueue("".getBytes(StandardCharsets.UTF_8), setupRun());
runner.run();
runner.assertTransferCount(CopyS3Object.REL_SUCCESS, 1);
verify(mockS3Client, times(1))
.copyObject(any(CopyObjectRequest.class));
final List<ProvenanceEventRecord> provenanceEvents = runner.getProvenanceEvents();
assertEquals(1, provenanceEvents.size());
}
@DisplayName("Validate that S3 errors cleanly route to failure")
@Test
void testS3ErrorHandling() {
final AmazonS3Exception exception = new AmazonS3Exception("Manually triggered error");
exception.setStatusCode(503);
when(mockS3Client.copyObject(any(CopyObjectRequest.class)))
.thenThrow(exception);
runner.enqueue(new byte[]{}, setupRun());
runner.run();
runner.assertTransferCount(CopyS3Object.REL_FAILURE, 1);
}
private Map<String, String> setupRun() {
runner.setProperty(CopyS3Object.SOURCE_BUCKET, "${s3.bucket.source}");
runner.setProperty(CopyS3Object.SOURCE_KEY, "${s3.key.source}");
runner.setProperty(CopyS3Object.DESTINATION_BUCKET, "${s3.bucket.target}");
runner.setProperty(CopyS3Object.DESTINATION_KEY, "${s3.key.target}");
return Map.of(
"s3.bucket.source", "dev-bucket",
"s3.key.source", "/test.txt",
"s3.bucket.target", "staging-bucket",
"s3.key.target", "/copied.txt"
);
}
}

View File

@ -0,0 +1,202 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.aws.s3;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.regions.Region;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.ObjectMetadata;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.aws.testutil.AuthUtils;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
class TestGetS3ObjectMetadata {
private TestRunner runner;
private AmazonS3Client mockS3Client;
private ObjectMetadata mockMetadata;
@BeforeEach
void setUp() {
mockS3Client = mock(AmazonS3Client.class);
GetS3ObjectMetadata mockGetS3ObjectMetadata = new GetS3ObjectMetadata() {
@Override
protected AmazonS3Client createClient(final ProcessContext context, final AWSCredentialsProvider credentialsProvider, final Region region, final ClientConfiguration config,
final AwsClientBuilder.EndpointConfiguration endpointConfiguration) {
return mockS3Client;
}
};
runner = TestRunners.newTestRunner(mockGetS3ObjectMetadata);
AuthUtils.enableAccessKey(runner, "accessKeyId", "secretKey");
mockMetadata = mock(ObjectMetadata.class);
Map<String, String> user = Map.of("x", "y");
Map<String, Object> raw = Map.of("a", "b");
when(mockMetadata.getUserMetadata()).thenReturn(user);
when(mockMetadata.getRawMetadata()).thenReturn(raw);
}
private void run() {
runner.setProperty(GetS3ObjectMetadata.BUCKET_WITH_DEFAULT_VALUE, "${s3.bucket}");
runner.setProperty(GetS3ObjectMetadata.KEY, "${filename}");
runner.enqueue("", Map.of("s3.bucket", "test-data", "filename", "test.txt"));
runner.run();
}
private Map<String, Object> setupObjectMetadata() {
Map<String, Object> rawMetadata = new HashMap<>();
rawMetadata.put("raw1", "x");
rawMetadata.put("raw2", "y");
Map<String, String> userMetadata = new HashMap<>();
userMetadata.put("user1", "a");
userMetadata.put("user2", "b");
userMetadata.put("mighthaveto", "excludemelater");
Map<String, Object> combined = new HashMap<>(rawMetadata);
combined.putAll(userMetadata);
when(mockMetadata.getRawMetadata()).thenReturn(rawMetadata);
when(mockMetadata.getUserMetadata()).thenReturn(userMetadata);
when(mockS3Client.getObjectMetadata(anyString(), anyString()))
.thenReturn(mockMetadata);
return combined;
}
@DisplayName("Validate fetch metadata to attribute routes to found when the file exists")
@Test
void testFetchMetadataToAttributeExists() {
runner.setProperty(GetS3ObjectMetadata.METADATA_TARGET, GetS3ObjectMetadata.TARGET_ATTRIBUTES.getValue());
runner.setProperty(GetS3ObjectMetadata.ATTRIBUTE_INCLUDE_PATTERN, "");
Map<String, Object> combined = setupObjectMetadata();
run();
runner.assertTransferCount(GetS3ObjectMetadata.REL_FOUND, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(GetS3ObjectMetadata.REL_FOUND).getFirst();
combined.forEach((k, v) -> {
String key = String.format("s3.%s", k);
String val = flowFile.getAttribute(key);
assertEquals(v, val);
});
}
@DisplayName("Validate attribution exclusion")
@Test
void testFetchMetadataToAttributeExclusion() {
runner.setProperty(GetS3ObjectMetadata.METADATA_TARGET, GetS3ObjectMetadata.TARGET_ATTRIBUTES.getValue());
runner.setProperty(GetS3ObjectMetadata.ATTRIBUTE_INCLUDE_PATTERN, "(raw|user)");
Map<String, Object> metadata = setupObjectMetadata();
Map<String, Object> musthave = new HashMap<>(metadata);
musthave.remove("mighthaveto");
run();
runner.assertTransferCount(GetS3ObjectMetadata.REL_FOUND, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(GetS3ObjectMetadata.REL_FOUND).getFirst();
musthave.forEach((k, v) -> {
String key = String.format("s3.%s", k);
String val = flowFile.getAttribute(key);
assertEquals(v, val);
});
assertNull(flowFile.getAttribute("s3.mighthaveto"));
}
@DisplayName("Validate fetch to attribute mode routes to failure on S3 error")
@Test
void testFetchMetadataToAttributeS3Error() {
AmazonS3Exception exception = new AmazonS3Exception("test");
exception.setStatusCode(501);
runner.setProperty(GetS3ObjectMetadata.METADATA_TARGET, GetS3ObjectMetadata.TARGET_ATTRIBUTES.getValue());
when(mockS3Client.getObjectMetadata(anyString(), anyString()))
.thenThrow(exception);
run();
runner.assertTransferCount(GetS3ObjectMetadata.REL_FAILURE, 1);
}
@DisplayName("Validate fetch metadata to attribute routes to not-found when when the file doesn't exist")
@Test
void testFetchMetadataToAttributeNotExist() {
AmazonS3Exception exception = new AmazonS3Exception("test");
exception.setStatusCode(404);
runner.setProperty(GetS3ObjectMetadata.METADATA_TARGET, GetS3ObjectMetadata.TARGET_ATTRIBUTES.getValue());
when(mockS3Client.getObjectMetadata(anyString(), anyString()))
.thenThrow(exception);
run();
runner.assertTransferCount(GetS3ObjectMetadata.REL_NOT_FOUND, 1);
}
@DisplayName("Validate fetch metadata to FlowFile body routes to found when the file exists")
@Test
void testFetchMetadataToBodyExists() {
runner.setProperty(GetS3ObjectMetadata.METADATA_TARGET, GetS3ObjectMetadata.TARGET_FLOWFILE_BODY.getValue());
when(mockS3Client.getObjectMetadata(anyString(), anyString()))
.thenReturn(mockMetadata);
run();
runner.assertTransferCount(GetS3ObjectMetadata.REL_FOUND, 1);
}
@DisplayName("Validate fetch metadata to FlowFile body routes to not-found when when the file doesn't exist")
@Test
void testFetchMetadataToBodyNotExist() {
AmazonS3Exception exception = new AmazonS3Exception("test");
exception.setStatusCode(404);
runner.setProperty(GetS3ObjectMetadata.METADATA_TARGET, GetS3ObjectMetadata.TARGET_FLOWFILE_BODY.getValue());
when(mockS3Client.getObjectMetadata(anyString(), anyString()))
.thenThrow(exception);
run();
runner.assertTransferCount(GetS3ObjectMetadata.REL_NOT_FOUND, 1);
}
@DisplayName("Validate fetch to FlowFile body mode routes to failure on S3 error")
@Test
void testFetchMetadataToBodyS3Error() {
AmazonS3Exception exception = new AmazonS3Exception("test");
exception.setStatusCode(501);
runner.setProperty(GetS3ObjectMetadata.METADATA_TARGET, GetS3ObjectMetadata.TARGET_FLOWFILE_BODY.getValue());
when(mockS3Client.getObjectMetadata(anyString(), anyString()))
.thenThrow(exception);
run();
runner.assertTransferCount(GetS3ObjectMetadata.REL_FAILURE, 1);
}
}