From e37cd2b5e631ca3e05dae35168dd4de467d0de14 Mon Sep 17 00:00:00 2001 From: Mike Date: Tue, 8 Oct 2024 13:26:19 -0500 Subject: [PATCH] NIFI-13776 Updated CopyS3Object to Handle Files over 5 GB This closes #9418 - Added multipart copying for files over 5 GB Co-authored-by: Mike Thomsen Co-authored-by: David Handermann Signed-off-by: Joseph Witt --- .../src/main/resources/META-INF/NOTICE | 6 + .../nifi/processors/aws/s3/CopyS3Object.java | 139 ++++++++++++++++-- .../processors/aws/s3/TestCopyS3Object.java | 8 + 3 files changed, 142 insertions(+), 11 deletions(-) diff --git a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-nar/src/main/resources/META-INF/NOTICE b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-nar/src/main/resources/META-INF/NOTICE index 29939af0f0..133327c742 100644 --- a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-nar/src/main/resources/META-INF/NOTICE +++ b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-nar/src/main/resources/META-INF/NOTICE @@ -174,6 +174,12 @@ The following binary components are provided under the Apache Software License v Error Prone Annotations Copyright 2015 The Error Prone Authors +The multi-part copy code in CopyS3Object was derived from code found in the Amazon S3 documentation. This is the specific file for reference: + https://github.com/awsdocs/aws-doc-sdk-examples/blob/df606a664bf2f7cfe3abc76c187e024451d0279c/java/example_code/s3/src/main/java/aws/example/s3/LowLevelMultipartCopy.java#L18 + + This code was provided under the terms of the Apache Software License V2 per this: + https://github.com/awsdocs/aws-doc-sdk-examples/blob/df606a664bf2f7cfe3abc76c187e024451d0279c/LICENSE + ************************ Eclipse Distribution License 1.0 ************************ diff --git a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/CopyS3Object.java b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/CopyS3Object.java index 9ec64bb46d..7ea3de59d8 100644 --- a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/CopyS3Object.java +++ b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/CopyS3Object.java @@ -16,11 +16,21 @@ */ package org.apache.nifi.processors.aws.s3; -import com.amazonaws.AmazonClientException; +import com.amazonaws.AmazonWebServiceRequest; import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.AbortMultipartUploadRequest; import com.amazonaws.services.s3.model.AccessControlList; +import com.amazonaws.services.s3.model.AmazonS3Exception; import com.amazonaws.services.s3.model.CannedAccessControlList; +import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest; import com.amazonaws.services.s3.model.CopyObjectRequest; +import com.amazonaws.services.s3.model.CopyPartRequest; +import com.amazonaws.services.s3.model.CopyPartResult; +import com.amazonaws.services.s3.model.GetObjectMetadataRequest; +import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; +import com.amazonaws.services.s3.model.InitiateMultipartUploadResult; +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.model.PartETag; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; @@ -30,8 +40,13 @@ import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.util.StringUtils; +import java.util.ArrayList; import java.util.List; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; +import java.util.stream.Collectors; import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION; @@ -40,6 +55,7 @@ import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION; @CapabilityDescription("Copies a file from one bucket and key to another in AWS S3") @SeeAlso({PutS3Object.class, DeleteS3Object.class, ListS3.class, TagS3Object.class, DeleteS3Object.class, FetchS3Object.class}) public class CopyS3Object extends AbstractS3Processor { + public static final long MULTIPART_THRESHOLD = 5L * 1024L * 1024L * 1024L; static final PropertyDescriptor SOURCE_BUCKET = new PropertyDescriptor.Builder() .fromPropertyDescriptor(BUCKET_WITH_DEFAULT_VALUE) @@ -120,28 +136,129 @@ public class CopyS3Object extends AbstractS3Processor { final String destinationBucket = context.getProperty(DESTINATION_BUCKET).evaluateAttributeExpressions(flowFile).getValue(); final String destinationKey = context.getProperty(DESTINATION_KEY).evaluateAttributeExpressions(flowFile).getValue(); + + + final AtomicReference multipartIdRef = new AtomicReference<>(); + boolean multipartUploadRequired = false; + try { - final CopyObjectRequest request = new CopyObjectRequest(sourceBucket, sourceKey, destinationBucket, destinationKey); + final GetObjectMetadataRequest sourceMetadataRequest = new GetObjectMetadataRequest(sourceBucket, sourceKey); + final ObjectMetadata metadataResult = s3.getObjectMetadata(sourceMetadataRequest); + final long contentLength = metadataResult.getContentLength(); + multipartUploadRequired = contentLength > MULTIPART_THRESHOLD; final AccessControlList acl = createACL(context, flowFile); - if (acl != null) { - request.setAccessControlList(acl); - } - final CannedAccessControlList cannedAccessControlList = createCannedACL(context, flowFile); - if (cannedAccessControlList != null) { - request.setCannedAccessControlList(cannedAccessControlList); - } - s3.copyObject(request); + if (multipartUploadRequired) { + copyMultipart(s3, acl, cannedAccessControlList, sourceBucket, sourceKey, destinationBucket, + destinationKey, multipartIdRef, contentLength); + } else { + copyObject(s3, acl, cannedAccessControlList, sourceBucket, sourceKey, destinationBucket, destinationKey); + } session.getProvenanceReporter().send(flowFile, getTransitUrl(destinationBucket, destinationKey)); session.transfer(flowFile, REL_SUCCESS); - } catch (final IllegalArgumentException | AmazonClientException e) { + } catch (final Exception e) { + if (multipartUploadRequired && StringUtils.isNotEmpty(multipartIdRef.get())) { + try { + final AbortMultipartUploadRequest abortRequest = new AbortMultipartUploadRequest(destinationBucket, destinationKey, multipartIdRef.get()); + s3.abortMultipartUpload(abortRequest); + } catch (final AmazonS3Exception s3e) { + getLogger().warn("Abort Multipart Upload failed for Bucket [{}] Key [{}]", destinationBucket, destinationKey, s3e); + } + } + flowFile = extractExceptionDetails(e, session, flowFile); getLogger().error("Failed to copy S3 object from Bucket [{}] Key [{}]", sourceBucket, sourceKey, e); session.transfer(flowFile, REL_FAILURE); } } + /* + * Sections of this code were derived from example code from the official AWS S3 documentation. Specifically this example: + * https://github.com/awsdocs/aws-doc-sdk-examples/blob/df606a664bf2f7cfe3abc76c187e024451d0279c/java/example_code/s3/src/main/java/aws/example/s3/LowLevelMultipartCopy.java + */ + private void copyMultipart(final AmazonS3Client s3, final AccessControlList acl, final CannedAccessControlList cannedAccessControlList, + final String sourceBucket, final String sourceKey, + final String destinationBucket, final String destinationKey, final AtomicReference multipartIdRef, + final long contentLength) { + InitiateMultipartUploadRequest initRequest = new InitiateMultipartUploadRequest(destinationBucket, + destinationKey); + if (acl != null) { + initRequest.setAccessControlList(acl); + } + if (cannedAccessControlList != null) { + initRequest.setCannedACL(cannedAccessControlList); + } + + final InitiateMultipartUploadResult initResult = s3.initiateMultipartUpload(initRequest); + + multipartIdRef.set(initResult.getUploadId()); + + long bytePosition = 0; + int partNumber = 1; + final List copyPartResults = new ArrayList<>(); + while (bytePosition < contentLength) { + long lastByte = Math.min(bytePosition + MULTIPART_THRESHOLD - 1, contentLength - 1); + + final CopyPartRequest copyPartRequest = new CopyPartRequest() + .withSourceBucketName(sourceBucket) + .withSourceKey(sourceKey) + .withDestinationBucketName(destinationBucket) + .withDestinationKey(destinationKey) + .withUploadId(initResult.getUploadId()) + .withFirstByte(bytePosition) + .withLastByte(lastByte) + .withPartNumber(partNumber++); + + doRetryLoop(partRequest -> copyPartResults.add(s3.copyPart((CopyPartRequest) partRequest)), copyPartRequest); + + bytePosition += MULTIPART_THRESHOLD; + } + + final CompleteMultipartUploadRequest completeRequest = new CompleteMultipartUploadRequest( + destinationBucket, + destinationKey, + initResult.getUploadId(), + copyPartResults.stream().map(response -> new PartETag(response.getPartNumber(), response.getETag())) + .collect(Collectors.toList())); + doRetryLoop(complete -> s3.completeMultipartUpload(completeRequest), completeRequest); + } + + private void doRetryLoop(Consumer consumer, AmazonWebServiceRequest request) { + boolean requestComplete = false; + int retryIndex = 0; + + while (!requestComplete) { + try { + consumer.accept(request); + requestComplete = true; + } catch (AmazonS3Exception e) { + if (e.getStatusCode() == 503 && retryIndex < 3) { + retryIndex++; + } else { + throw e; + } + } + } + } + + private void copyObject(final AmazonS3Client s3, final AccessControlList acl, + final CannedAccessControlList cannedAcl, + final String sourceBucket, final String sourceKey, + final String destinationBucket, final String destinationKey) { + final CopyObjectRequest request = new CopyObjectRequest(sourceBucket, sourceKey, destinationBucket, destinationKey); + + if (acl != null) { + request.setAccessControlList(acl); + } + + if (cannedAcl != null) { + request.setCannedAccessControlList(cannedAcl); + } + + s3.copyObject(request); + } + private String getTransitUrl(final String destinationBucket, final String destinationKey) { final String spacer = destinationKey.startsWith("/") ? "" : "/"; return String.format("s3://%s%s%s", destinationBucket, spacer, destinationKey); diff --git a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestCopyS3Object.java b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestCopyS3Object.java index d704d40ec4..0856e2750c 100644 --- a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestCopyS3Object.java +++ b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestCopyS3Object.java @@ -24,6 +24,8 @@ import com.amazonaws.regions.Region; import com.amazonaws.services.s3.AmazonS3Client; import com.amazonaws.services.s3.model.AmazonS3Exception; import com.amazonaws.services.s3.model.CopyObjectRequest; +import com.amazonaws.services.s3.model.GetObjectMetadataRequest; +import com.amazonaws.services.s3.model.ObjectMetadata; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processors.aws.testutil.AuthUtils; import org.apache.nifi.provenance.ProvenanceEventRecord; @@ -56,6 +58,12 @@ public class TestCopyS3Object { @Override protected AmazonS3Client createClient(final ProcessContext context, final AWSCredentialsProvider credentialsProvider, final Region region, final ClientConfiguration config, final AwsClientBuilder.EndpointConfiguration endpointConfiguration) { + ObjectMetadata metadata = mock(ObjectMetadata.class); + + when(metadata.getContentLength()).thenReturn(1000L); + when(mockS3Client.getObjectMetadata(any(GetObjectMetadataRequest.class))) + .thenReturn(metadata); + return mockS3Client; } };