diff --git a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/CopyS3Object.java b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/CopyS3Object.java new file mode 100644 index 0000000000..9f886bda50 --- /dev/null +++ b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/CopyS3Object.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.s3; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.AccessControlList; +import com.amazonaws.services.s3.model.CannedAccessControlList; +import com.amazonaws.services.s3.model.CopyObjectRequest; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; + +import java.util.List; + +import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION; + +@Tags({"Amazon", "S3", "AWS", "Archive", "Copy"}) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Copies a file from one bucket and key to another in AWS S3") +@SeeAlso({PutS3Object.class, DeleteS3Object.class, ListS3.class, TagS3Object.class, DeleteS3Object.class, FetchS3Object.class}) +public class CopyS3Object extends AbstractS3Processor { + + static final PropertyDescriptor SOURCE_BUCKET = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(BUCKET_WITH_DEFAULT_VALUE) + .name("Source Bucket") + .displayName("Source Bucket") + .description("The bucket that contains the file to be copied.") + .build(); + + static final PropertyDescriptor SOURCE_KEY = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(KEY) + .name("Source Key") + .displayName("Source Key") + .description("The source key in the source bucket") + .build(); + + static final PropertyDescriptor DESTINATION_BUCKET = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(BUCKET_WITHOUT_DEFAULT_VALUE) + .name("Destination Bucket") + .displayName("Destination Bucket") + .description("The bucket that will receive the copy.") + .build(); + + static final PropertyDescriptor DESTINATION_KEY = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(KEY) + .name("Destination Key") + .displayName("Destination Key") + .description("The target key in the target bucket") + .defaultValue("${filename}-1") + .build(); + + static final List properties = List.of( + SOURCE_BUCKET, + SOURCE_KEY, + DESTINATION_BUCKET, + DESTINATION_KEY, + AWS_CREDENTIALS_PROVIDER_SERVICE, + S3_REGION, + TIMEOUT, + FULL_CONTROL_USER_LIST, + READ_USER_LIST, + WRITE_USER_LIST, + READ_ACL_LIST, + WRITE_ACL_LIST, + CANNED_ACL, + OWNER, + SSL_CONTEXT_SERVICE, + ENDPOINT_OVERRIDE, + SIGNER_OVERRIDE, + S3_CUSTOM_SIGNER_CLASS_NAME, + S3_CUSTOM_SIGNER_MODULE_LOCATION, + PROXY_CONFIGURATION_SERVICE + ); + + @Override + protected List getSupportedPropertyDescriptors() { + return properties; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final AmazonS3Client s3; + try { + s3 = getS3Client(context, flowFile.getAttributes()); + } catch (Exception e) { + getLogger().error("Failed to initialize S3 client", e); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + return; + } + + final String sourceBucket = context.getProperty(SOURCE_BUCKET).evaluateAttributeExpressions(flowFile).getValue(); + final String sourceKey = context.getProperty(SOURCE_KEY).evaluateAttributeExpressions(flowFile).getValue(); + final String destinationBucket = context.getProperty(DESTINATION_BUCKET).evaluateAttributeExpressions(flowFile).getValue(); + final String destinationKey = context.getProperty(DESTINATION_KEY).evaluateAttributeExpressions(flowFile).getValue(); + + try { + final CopyObjectRequest request = new CopyObjectRequest(sourceBucket, sourceKey, destinationBucket, destinationKey); + final AccessControlList acl = createACL(context, flowFile); + if (acl != null) { + request.setAccessControlList(acl); + } + + final CannedAccessControlList cannedAccessControlList = createCannedACL(context, flowFile); + if (cannedAccessControlList != null) { + request.setCannedAccessControlList(cannedAccessControlList); + } + + s3.copyObject(request); + session.getProvenanceReporter().send(flowFile, getTransitUrl(destinationBucket, destinationKey)); + session.transfer(flowFile, REL_SUCCESS); + } catch (final AmazonClientException e) { + flowFile = extractExceptionDetails(e, session, flowFile); + getLogger().error("Failed to copy S3 object from Bucket [{}] Key [{}]", sourceBucket, sourceKey, e); + session.transfer(flowFile, REL_FAILURE); + } + } + + private String getTransitUrl(final String destinationBucket, final String destinationKey) { + final String spacer = destinationKey.startsWith("/") ? "" : "/"; + return String.format("s3://%s%s%s", destinationBucket, spacer, destinationKey); + } +} diff --git a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java index 7e4694ed79..e885b6b56b 100644 --- a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java +++ b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java @@ -48,7 +48,7 @@ import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION; @WritesAttribute(attribute = "s3.statusCode", description = "The HTTP error code (if available) from the failed operation"), @WritesAttribute(attribute = "s3.errorCode", description = "The S3 moniker of the failed operation"), @WritesAttribute(attribute = "s3.errorMessage", description = "The S3 exception message from the failed operation")}) -@SeeAlso({PutS3Object.class, FetchS3Object.class, ListS3.class}) +@SeeAlso({PutS3Object.class, FetchS3Object.class, ListS3.class, CopyS3Object.class, GetS3ObjectMetadata.class, TagS3Object.class}) @Tags({"Amazon", "S3", "AWS", "Archive", "Delete"}) @InputRequirement(Requirement.INPUT_REQUIRED) @CapabilityDescription("Deletes a file from an Amazon S3 Bucket. If attempting to delete a file that does not exist, FlowFile is routed to success.") diff --git a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java index 67fe18280b..05a5224fac 100644 --- a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java +++ b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java @@ -65,7 +65,7 @@ import java.util.concurrent.TimeUnit; import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION; @SupportsBatching -@SeeAlso({PutS3Object.class, DeleteS3Object.class, ListS3.class}) +@SeeAlso({PutS3Object.class, DeleteS3Object.class, ListS3.class, CopyS3Object.class, GetS3ObjectMetadata.class, TagS3Object.class}) @InputRequirement(Requirement.INPUT_REQUIRED) @Tags({"Amazon", "S3", "AWS", "Get", "Fetch"}) @CapabilityDescription("Retrieves the contents of an S3 Object and writes it to the content of a FlowFile") diff --git a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/GetS3ObjectMetadata.java b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/GetS3ObjectMetadata.java new file mode 100644 index 0000000000..9793e81afd --- /dev/null +++ b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/GetS3ObjectMetadata.java @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.aws.s3; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.AmazonS3Exception; +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.Validator; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; + +import java.util.Date; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION; + +@Tags({"Amazon", "S3", "AWS", "Archive", "Exists"}) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Check for the existence of a file in S3 without attempting to download it. This processor can be " + + "used as a router for work flows that need to check on a file in S3 before proceeding with data processing") +@SeeAlso({PutS3Object.class, DeleteS3Object.class, ListS3.class, TagS3Object.class, DeleteS3Object.class, FetchS3Object.class}) +public class GetS3ObjectMetadata extends AbstractS3Processor { + + static final AllowableValue TARGET_ATTRIBUTES = new AllowableValue("ATTRIBUTES", "Attributes", """ + When selected, the metadata will be written to FlowFile attributes with the prefix "s3." following the convention used in other processors. For example: + the standard S3 attribute Content-Type will be written as s3.Content-Type when using the default value. User-defined metadata + will be included in the attributes added to the FlowFile + """ + ); + + static final AllowableValue TARGET_FLOWFILE_BODY = new AllowableValue("FLOWFILE_BODY", "FlowFile Body", "Write the metadata to FlowFile content as JSON data."); + + static final PropertyDescriptor METADATA_TARGET = new PropertyDescriptor.Builder() + .name("Metadata Target") + .description("This determines where the metadata will be written when found.") + .addValidator(Validator.VALID) + .required(true) + .allowableValues(TARGET_ATTRIBUTES, TARGET_FLOWFILE_BODY) + .defaultValue(TARGET_ATTRIBUTES) + .build(); + + static final PropertyDescriptor ATTRIBUTE_INCLUDE_PATTERN = new PropertyDescriptor.Builder() + .name("Metadata Attribute Include Pattern") + .description(""" + A regular expression pattern to use for determining which object metadata entries are included as FlowFile + attributes. This pattern is only applied to the 'found' relationship and will not be used to + filter the error attributes in the 'failure' relationship. + """ + ) + .addValidator(Validator.VALID) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .defaultValue(".*") + .dependsOn(METADATA_TARGET, TARGET_ATTRIBUTES) + .build(); + + private static final List properties = List.of( + METADATA_TARGET, + ATTRIBUTE_INCLUDE_PATTERN, + BUCKET_WITH_DEFAULT_VALUE, + KEY, + AWS_CREDENTIALS_PROVIDER_SERVICE, + S3_REGION, + TIMEOUT, + FULL_CONTROL_USER_LIST, + READ_USER_LIST, + READ_ACL_LIST, + OWNER, + SSL_CONTEXT_SERVICE, + ENDPOINT_OVERRIDE, + SIGNER_OVERRIDE, + S3_CUSTOM_SIGNER_CLASS_NAME, + S3_CUSTOM_SIGNER_MODULE_LOCATION, + PROXY_CONFIGURATION_SERVICE + ); + + static Relationship REL_FOUND = new Relationship.Builder() + .name("found") + .description("An object was found in the bucket at the supplied key") + .build(); + + static Relationship REL_NOT_FOUND = new Relationship.Builder() + .name("not found") + .description("No object was found in the bucket the supplied key") + .build(); + + private static final Set relationships = Set.of(REL_FOUND, REL_NOT_FOUND, REL_FAILURE); + + private static final String ATTRIBUTE_FORMAT = "s3.%s"; + + private static final ObjectMapper MAPPER = new ObjectMapper(); + + @Override + public Set getRelationships() { + return relationships; + } + + @Override + protected List getSupportedPropertyDescriptors() { + return properties; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final AmazonS3Client s3; + try { + s3 = getS3Client(context, flowFile.getAttributes()); + } catch (Exception e) { + getLogger().error("Failed to initialize S3 client", e); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + return; + } + + final String bucket = context.getProperty(BUCKET_WITH_DEFAULT_VALUE).evaluateAttributeExpressions(flowFile).getValue(); + final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue(); + final Pattern attributePattern; + + final PropertyValue attributeIncludePatternProperty = context.getProperty(ATTRIBUTE_INCLUDE_PATTERN).evaluateAttributeExpressions(flowFile); + if (attributeIncludePatternProperty.isSet()) { + attributePattern = Pattern.compile(attributeIncludePatternProperty.getValue()); + } else { + attributePattern = null; + } + + final String metadataTarget = context.getProperty(METADATA_TARGET).getValue(); + + try { + Relationship relationship; + + try { + final ObjectMetadata objectMetadata = s3.getObjectMetadata(bucket, key); + final Map combinedMetadata = new LinkedHashMap<>(objectMetadata.getRawMetadata()); + combinedMetadata.putAll(objectMetadata.getUserMetadata()); + + if (TARGET_ATTRIBUTES.getValue().equals(metadataTarget)) { + final Map newAttributes = combinedMetadata + .entrySet().stream() + .filter(e -> { + if (attributePattern == null) { + return true; + } else { + return attributePattern.matcher(e.getKey()).find(); + } + }) + .collect(Collectors.toMap(e -> ATTRIBUTE_FORMAT.formatted(e.getKey()), e -> { + final Object value = e.getValue(); + final String attributeValue; + if (value instanceof Date dateValue) { + attributeValue = Long.toString(dateValue.getTime()); + } else { + attributeValue = value.toString(); + } + return attributeValue; + })); + + flowFile = session.putAllAttributes(flowFile, newAttributes); + } else if (TARGET_FLOWFILE_BODY.getValue().equals(metadataTarget)) { + flowFile = session.write(flowFile, outputStream -> MAPPER.writeValue(outputStream, combinedMetadata)); + } + + relationship = REL_FOUND; + } catch (final AmazonS3Exception e) { + if (e.getStatusCode() == 404) { + relationship = REL_NOT_FOUND; + flowFile = extractExceptionDetails(e, session, flowFile); + } else { + throw e; + } + } + + session.transfer(flowFile, relationship); + } catch (final AmazonClientException e) { + getLogger().error("Failed to get S3 Object Metadata from Bucket [{}] Key [{}]", bucket, key, e); + flowFile = extractExceptionDetails(e, session, flowFile); + session.transfer(flowFile, REL_FAILURE); + } + } +} diff --git a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java index b703b14c26..938a3fc05e 100644 --- a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java +++ b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java @@ -106,7 +106,7 @@ import static org.apache.nifi.processors.aws.util.RegionUtilV1.REGION; @TriggerWhenEmpty @InputRequirement(Requirement.INPUT_FORBIDDEN) @Tags({"Amazon", "S3", "AWS", "list"}) -@SeeAlso({FetchS3Object.class, PutS3Object.class, DeleteS3Object.class}) +@SeeAlso({FetchS3Object.class, PutS3Object.class, DeleteS3Object.class, CopyS3Object.class, GetS3ObjectMetadata.class, TagS3Object.class}) @CapabilityDescription("Retrieves a listing of objects from an S3 bucket. For each object that is listed, creates a FlowFile that represents " + "the object so that it can be fetched in conjunction with FetchS3Object. This Processor is designed to run on Primary Node only " + "in a cluster. If the primary node changes, the new Primary Node will pick up where the previous node left off without duplicating " diff --git a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java index baeee681c1..cd695b4f06 100644 --- a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java +++ b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java @@ -95,7 +95,7 @@ import static org.apache.nifi.processors.transfer.ResourceTransferProperties.RES import static org.apache.nifi.processors.transfer.ResourceTransferUtils.getFileResource; @SupportsBatching -@SeeAlso({FetchS3Object.class, DeleteS3Object.class, ListS3.class}) +@SeeAlso({FetchS3Object.class, DeleteS3Object.class, ListS3.class, CopyS3Object.class, GetS3ObjectMetadata.class, TagS3Object.class}) @InputRequirement(Requirement.INPUT_REQUIRED) @Tags({"Amazon", "S3", "AWS", "Archive", "Put"}) @CapabilityDescription("Writes the contents of a FlowFile as an S3 Object to an Amazon S3 Bucket.") diff --git a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/TagS3Object.java b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/TagS3Object.java index d62716a465..2de04ef9a8 100644 --- a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/TagS3Object.java +++ b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/TagS3Object.java @@ -59,7 +59,7 @@ import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION; @WritesAttribute(attribute = "s3.statusCode", description = "The HTTP error code (if available) from the failed operation"), @WritesAttribute(attribute = "s3.errorCode", description = "The S3 moniker of the failed operation"), @WritesAttribute(attribute = "s3.errorMessage", description = "The S3 exception message from the failed operation")}) -@SeeAlso({PutS3Object.class, FetchS3Object.class, ListS3.class}) +@SeeAlso({PutS3Object.class, FetchS3Object.class, ListS3.class, CopyS3Object.class, GetS3ObjectMetadata.class, DeleteS3Object.class}) @Tags({"Amazon", "S3", "AWS", "Archive", "Tag"}) @InputRequirement(Requirement.INPUT_REQUIRED) @CapabilityDescription("Adds or updates a tag on an Amazon S3 Object.") diff --git a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index a56392c9ae..050fbe1843 100644 --- a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -12,6 +12,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +org.apache.nifi.processors.aws.s3.CopyS3Object +org.apache.nifi.processors.aws.s3.GetS3ObjectMetadata org.apache.nifi.processors.aws.s3.FetchS3Object org.apache.nifi.processors.aws.s3.PutS3Object org.apache.nifi.processors.aws.s3.DeleteS3Object diff --git a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestCopyS3Object.java b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestCopyS3Object.java new file mode 100644 index 0000000000..d704d40ec4 --- /dev/null +++ b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestCopyS3Object.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.aws.s3; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.client.builder.AwsClientBuilder; +import com.amazonaws.regions.Region; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.AmazonS3Exception; +import com.amazonaws.services.s3.model.CopyObjectRequest; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processors.aws.testutil.AuthUtils; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class TestCopyS3Object { + private TestRunner runner; + + private AmazonS3Client mockS3Client; + + @BeforeEach + void setUp() { + mockS3Client = mock(AmazonS3Client.class); + final CopyS3Object mockCopyS3Object = new CopyS3Object() { + @Override + protected AmazonS3Client createClient(final ProcessContext context, final AWSCredentialsProvider credentialsProvider, final Region region, final ClientConfiguration config, + final AwsClientBuilder.EndpointConfiguration endpointConfiguration) { + return mockS3Client; + } + }; + runner = TestRunners.newTestRunner(mockCopyS3Object); + AuthUtils.enableAccessKey(runner, "accessKeyId", "secretKey"); + } + + @DisplayName("Test a normal run that SHOULD succeed") + @Test + void testRun() { + runner.enqueue("".getBytes(StandardCharsets.UTF_8), setupRun()); + runner.run(); + + runner.assertTransferCount(CopyS3Object.REL_SUCCESS, 1); + + verify(mockS3Client, times(1)) + .copyObject(any(CopyObjectRequest.class)); + + final List provenanceEvents = runner.getProvenanceEvents(); + assertEquals(1, provenanceEvents.size()); + } + + @DisplayName("Validate that S3 errors cleanly route to failure") + @Test + void testS3ErrorHandling() { + final AmazonS3Exception exception = new AmazonS3Exception("Manually triggered error"); + exception.setStatusCode(503); + when(mockS3Client.copyObject(any(CopyObjectRequest.class))) + .thenThrow(exception); + + runner.enqueue(new byte[]{}, setupRun()); + runner.run(); + + runner.assertTransferCount(CopyS3Object.REL_FAILURE, 1); + } + + private Map setupRun() { + runner.setProperty(CopyS3Object.SOURCE_BUCKET, "${s3.bucket.source}"); + runner.setProperty(CopyS3Object.SOURCE_KEY, "${s3.key.source}"); + runner.setProperty(CopyS3Object.DESTINATION_BUCKET, "${s3.bucket.target}"); + runner.setProperty(CopyS3Object.DESTINATION_KEY, "${s3.key.target}"); + + return Map.of( + "s3.bucket.source", "dev-bucket", + "s3.key.source", "/test.txt", + "s3.bucket.target", "staging-bucket", + "s3.key.target", "/copied.txt" + ); + } +} diff --git a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestGetS3ObjectMetadata.java b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestGetS3ObjectMetadata.java new file mode 100644 index 0000000000..7346b06df2 --- /dev/null +++ b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestGetS3ObjectMetadata.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.aws.s3; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.client.builder.AwsClientBuilder; +import com.amazonaws.regions.Region; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.AmazonS3Exception; +import com.amazonaws.services.s3.model.ObjectMetadata; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processors.aws.testutil.AuthUtils; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +class TestGetS3ObjectMetadata { + private TestRunner runner; + + private AmazonS3Client mockS3Client; + + private ObjectMetadata mockMetadata; + + @BeforeEach + void setUp() { + mockS3Client = mock(AmazonS3Client.class); + GetS3ObjectMetadata mockGetS3ObjectMetadata = new GetS3ObjectMetadata() { + @Override + protected AmazonS3Client createClient(final ProcessContext context, final AWSCredentialsProvider credentialsProvider, final Region region, final ClientConfiguration config, + final AwsClientBuilder.EndpointConfiguration endpointConfiguration) { + return mockS3Client; + } + }; + runner = TestRunners.newTestRunner(mockGetS3ObjectMetadata); + AuthUtils.enableAccessKey(runner, "accessKeyId", "secretKey"); + + mockMetadata = mock(ObjectMetadata.class); + Map user = Map.of("x", "y"); + Map raw = Map.of("a", "b"); + when(mockMetadata.getUserMetadata()).thenReturn(user); + when(mockMetadata.getRawMetadata()).thenReturn(raw); + } + + private void run() { + runner.setProperty(GetS3ObjectMetadata.BUCKET_WITH_DEFAULT_VALUE, "${s3.bucket}"); + runner.setProperty(GetS3ObjectMetadata.KEY, "${filename}"); + runner.enqueue("", Map.of("s3.bucket", "test-data", "filename", "test.txt")); + + runner.run(); + } + + private Map setupObjectMetadata() { + Map rawMetadata = new HashMap<>(); + rawMetadata.put("raw1", "x"); + rawMetadata.put("raw2", "y"); + Map userMetadata = new HashMap<>(); + userMetadata.put("user1", "a"); + userMetadata.put("user2", "b"); + userMetadata.put("mighthaveto", "excludemelater"); + Map combined = new HashMap<>(rawMetadata); + combined.putAll(userMetadata); + + when(mockMetadata.getRawMetadata()).thenReturn(rawMetadata); + when(mockMetadata.getUserMetadata()).thenReturn(userMetadata); + + when(mockS3Client.getObjectMetadata(anyString(), anyString())) + .thenReturn(mockMetadata); + + return combined; + } + + @DisplayName("Validate fetch metadata to attribute routes to found when the file exists") + @Test + void testFetchMetadataToAttributeExists() { + runner.setProperty(GetS3ObjectMetadata.METADATA_TARGET, GetS3ObjectMetadata.TARGET_ATTRIBUTES.getValue()); + runner.setProperty(GetS3ObjectMetadata.ATTRIBUTE_INCLUDE_PATTERN, ""); + + Map combined = setupObjectMetadata(); + + run(); + runner.assertTransferCount(GetS3ObjectMetadata.REL_FOUND, 1); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(GetS3ObjectMetadata.REL_FOUND).getFirst(); + combined.forEach((k, v) -> { + String key = String.format("s3.%s", k); + String val = flowFile.getAttribute(key); + assertEquals(v, val); + }); + } + + @DisplayName("Validate attribution exclusion") + @Test + void testFetchMetadataToAttributeExclusion() { + runner.setProperty(GetS3ObjectMetadata.METADATA_TARGET, GetS3ObjectMetadata.TARGET_ATTRIBUTES.getValue()); + runner.setProperty(GetS3ObjectMetadata.ATTRIBUTE_INCLUDE_PATTERN, "(raw|user)"); + + Map metadata = setupObjectMetadata(); + Map musthave = new HashMap<>(metadata); + musthave.remove("mighthaveto"); + + run(); + runner.assertTransferCount(GetS3ObjectMetadata.REL_FOUND, 1); + + MockFlowFile flowFile = runner.getFlowFilesForRelationship(GetS3ObjectMetadata.REL_FOUND).getFirst(); + musthave.forEach((k, v) -> { + String key = String.format("s3.%s", k); + String val = flowFile.getAttribute(key); + assertEquals(v, val); + }); + + assertNull(flowFile.getAttribute("s3.mighthaveto")); + } + + @DisplayName("Validate fetch to attribute mode routes to failure on S3 error") + @Test + void testFetchMetadataToAttributeS3Error() { + AmazonS3Exception exception = new AmazonS3Exception("test"); + exception.setStatusCode(501); + + runner.setProperty(GetS3ObjectMetadata.METADATA_TARGET, GetS3ObjectMetadata.TARGET_ATTRIBUTES.getValue()); + when(mockS3Client.getObjectMetadata(anyString(), anyString())) + .thenThrow(exception); + run(); + runner.assertTransferCount(GetS3ObjectMetadata.REL_FAILURE, 1); + } + + @DisplayName("Validate fetch metadata to attribute routes to not-found when when the file doesn't exist") + @Test + void testFetchMetadataToAttributeNotExist() { + AmazonS3Exception exception = new AmazonS3Exception("test"); + exception.setStatusCode(404); + + runner.setProperty(GetS3ObjectMetadata.METADATA_TARGET, GetS3ObjectMetadata.TARGET_ATTRIBUTES.getValue()); + when(mockS3Client.getObjectMetadata(anyString(), anyString())) + .thenThrow(exception); + run(); + runner.assertTransferCount(GetS3ObjectMetadata.REL_NOT_FOUND, 1); + } + + @DisplayName("Validate fetch metadata to FlowFile body routes to found when the file exists") + @Test + void testFetchMetadataToBodyExists() { + runner.setProperty(GetS3ObjectMetadata.METADATA_TARGET, GetS3ObjectMetadata.TARGET_FLOWFILE_BODY.getValue()); + when(mockS3Client.getObjectMetadata(anyString(), anyString())) + .thenReturn(mockMetadata); + run(); + runner.assertTransferCount(GetS3ObjectMetadata.REL_FOUND, 1); + } + + @DisplayName("Validate fetch metadata to FlowFile body routes to not-found when when the file doesn't exist") + @Test + void testFetchMetadataToBodyNotExist() { + AmazonS3Exception exception = new AmazonS3Exception("test"); + exception.setStatusCode(404); + + runner.setProperty(GetS3ObjectMetadata.METADATA_TARGET, GetS3ObjectMetadata.TARGET_FLOWFILE_BODY.getValue()); + when(mockS3Client.getObjectMetadata(anyString(), anyString())) + .thenThrow(exception); + run(); + runner.assertTransferCount(GetS3ObjectMetadata.REL_NOT_FOUND, 1); + } + + @DisplayName("Validate fetch to FlowFile body mode routes to failure on S3 error") + @Test + void testFetchMetadataToBodyS3Error() { + AmazonS3Exception exception = new AmazonS3Exception("test"); + exception.setStatusCode(501); + + runner.setProperty(GetS3ObjectMetadata.METADATA_TARGET, GetS3ObjectMetadata.TARGET_FLOWFILE_BODY.getValue()); + when(mockS3Client.getObjectMetadata(anyString(), anyString())) + .thenThrow(exception); + run(); + runner.assertTransferCount(GetS3ObjectMetadata.REL_FAILURE, 1); + } +}