NIFI-13776 Updated CopyS3Object to Handle Files over 5 GB

This closes #9418

- Added multipart copying for files over 5 GB

Co-authored-by: Mike Thomsen <mthomsen@apache.org>
Co-authored-by: David Handermann <exceptionfactory@apache.org>
Signed-off-by: Joseph Witt <joewitt@apache.org>
This commit is contained in:
Mike 2024-10-08 13:26:19 -05:00 committed by Joseph Witt
parent 80e889305f
commit e37cd2b5e6
No known key found for this signature in database
GPG Key ID: 9093BF854F811A1A
3 changed files with 142 additions and 11 deletions

View File

@ -174,6 +174,12 @@ The following binary components are provided under the Apache Software License v
Error Prone Annotations
Copyright 2015 The Error Prone Authors
The multi-part copy code in CopyS3Object was derived from code found in the Amazon S3 documentation. This is the specific file for reference:
https://github.com/awsdocs/aws-doc-sdk-examples/blob/df606a664bf2f7cfe3abc76c187e024451d0279c/java/example_code/s3/src/main/java/aws/example/s3/LowLevelMultipartCopy.java#L18
This code was provided under the terms of the Apache Software License V2 per this:
https://github.com/awsdocs/aws-doc-sdk-examples/blob/df606a664bf2f7cfe3abc76c187e024451d0279c/LICENSE
************************
Eclipse Distribution License 1.0
************************

View File

@ -16,11 +16,21 @@
*/
package org.apache.nifi.processors.aws.s3;
import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonWebServiceRequest;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
import com.amazonaws.services.s3.model.AccessControlList;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.CannedAccessControlList;
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
import com.amazonaws.services.s3.model.CopyObjectRequest;
import com.amazonaws.services.s3.model.CopyPartRequest;
import com.amazonaws.services.s3.model.CopyPartResult;
import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PartETag;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
@ -30,8 +40,13 @@ import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.util.StringUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION;
@ -40,6 +55,7 @@ import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION;
@CapabilityDescription("Copies a file from one bucket and key to another in AWS S3")
@SeeAlso({PutS3Object.class, DeleteS3Object.class, ListS3.class, TagS3Object.class, DeleteS3Object.class, FetchS3Object.class})
public class CopyS3Object extends AbstractS3Processor {
public static final long MULTIPART_THRESHOLD = 5L * 1024L * 1024L * 1024L;
static final PropertyDescriptor SOURCE_BUCKET = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(BUCKET_WITH_DEFAULT_VALUE)
@ -120,28 +136,129 @@ public class CopyS3Object extends AbstractS3Processor {
final String destinationBucket = context.getProperty(DESTINATION_BUCKET).evaluateAttributeExpressions(flowFile).getValue();
final String destinationKey = context.getProperty(DESTINATION_KEY).evaluateAttributeExpressions(flowFile).getValue();
final AtomicReference<String> multipartIdRef = new AtomicReference<>();
boolean multipartUploadRequired = false;
try {
final CopyObjectRequest request = new CopyObjectRequest(sourceBucket, sourceKey, destinationBucket, destinationKey);
final GetObjectMetadataRequest sourceMetadataRequest = new GetObjectMetadataRequest(sourceBucket, sourceKey);
final ObjectMetadata metadataResult = s3.getObjectMetadata(sourceMetadataRequest);
final long contentLength = metadataResult.getContentLength();
multipartUploadRequired = contentLength > MULTIPART_THRESHOLD;
final AccessControlList acl = createACL(context, flowFile);
if (acl != null) {
request.setAccessControlList(acl);
}
final CannedAccessControlList cannedAccessControlList = createCannedACL(context, flowFile);
if (cannedAccessControlList != null) {
request.setCannedAccessControlList(cannedAccessControlList);
}
s3.copyObject(request);
if (multipartUploadRequired) {
copyMultipart(s3, acl, cannedAccessControlList, sourceBucket, sourceKey, destinationBucket,
destinationKey, multipartIdRef, contentLength);
} else {
copyObject(s3, acl, cannedAccessControlList, sourceBucket, sourceKey, destinationBucket, destinationKey);
}
session.getProvenanceReporter().send(flowFile, getTransitUrl(destinationBucket, destinationKey));
session.transfer(flowFile, REL_SUCCESS);
} catch (final IllegalArgumentException | AmazonClientException e) {
} catch (final Exception e) {
if (multipartUploadRequired && StringUtils.isNotEmpty(multipartIdRef.get())) {
try {
final AbortMultipartUploadRequest abortRequest = new AbortMultipartUploadRequest(destinationBucket, destinationKey, multipartIdRef.get());
s3.abortMultipartUpload(abortRequest);
} catch (final AmazonS3Exception s3e) {
getLogger().warn("Abort Multipart Upload failed for Bucket [{}] Key [{}]", destinationBucket, destinationKey, s3e);
}
}
flowFile = extractExceptionDetails(e, session, flowFile);
getLogger().error("Failed to copy S3 object from Bucket [{}] Key [{}]", sourceBucket, sourceKey, e);
session.transfer(flowFile, REL_FAILURE);
}
}
/*
* Sections of this code were derived from example code from the official AWS S3 documentation. Specifically this example:
* https://github.com/awsdocs/aws-doc-sdk-examples/blob/df606a664bf2f7cfe3abc76c187e024451d0279c/java/example_code/s3/src/main/java/aws/example/s3/LowLevelMultipartCopy.java
*/
private void copyMultipart(final AmazonS3Client s3, final AccessControlList acl, final CannedAccessControlList cannedAccessControlList,
final String sourceBucket, final String sourceKey,
final String destinationBucket, final String destinationKey, final AtomicReference<String> multipartIdRef,
final long contentLength) {
InitiateMultipartUploadRequest initRequest = new InitiateMultipartUploadRequest(destinationBucket,
destinationKey);
if (acl != null) {
initRequest.setAccessControlList(acl);
}
if (cannedAccessControlList != null) {
initRequest.setCannedACL(cannedAccessControlList);
}
final InitiateMultipartUploadResult initResult = s3.initiateMultipartUpload(initRequest);
multipartIdRef.set(initResult.getUploadId());
long bytePosition = 0;
int partNumber = 1;
final List<CopyPartResult> copyPartResults = new ArrayList<>();
while (bytePosition < contentLength) {
long lastByte = Math.min(bytePosition + MULTIPART_THRESHOLD - 1, contentLength - 1);
final CopyPartRequest copyPartRequest = new CopyPartRequest()
.withSourceBucketName(sourceBucket)
.withSourceKey(sourceKey)
.withDestinationBucketName(destinationBucket)
.withDestinationKey(destinationKey)
.withUploadId(initResult.getUploadId())
.withFirstByte(bytePosition)
.withLastByte(lastByte)
.withPartNumber(partNumber++);
doRetryLoop(partRequest -> copyPartResults.add(s3.copyPart((CopyPartRequest) partRequest)), copyPartRequest);
bytePosition += MULTIPART_THRESHOLD;
}
final CompleteMultipartUploadRequest completeRequest = new CompleteMultipartUploadRequest(
destinationBucket,
destinationKey,
initResult.getUploadId(),
copyPartResults.stream().map(response -> new PartETag(response.getPartNumber(), response.getETag()))
.collect(Collectors.toList()));
doRetryLoop(complete -> s3.completeMultipartUpload(completeRequest), completeRequest);
}
private void doRetryLoop(Consumer<AmazonWebServiceRequest> consumer, AmazonWebServiceRequest request) {
boolean requestComplete = false;
int retryIndex = 0;
while (!requestComplete) {
try {
consumer.accept(request);
requestComplete = true;
} catch (AmazonS3Exception e) {
if (e.getStatusCode() == 503 && retryIndex < 3) {
retryIndex++;
} else {
throw e;
}
}
}
}
private void copyObject(final AmazonS3Client s3, final AccessControlList acl,
final CannedAccessControlList cannedAcl,
final String sourceBucket, final String sourceKey,
final String destinationBucket, final String destinationKey) {
final CopyObjectRequest request = new CopyObjectRequest(sourceBucket, sourceKey, destinationBucket, destinationKey);
if (acl != null) {
request.setAccessControlList(acl);
}
if (cannedAcl != null) {
request.setCannedAccessControlList(cannedAcl);
}
s3.copyObject(request);
}
private String getTransitUrl(final String destinationBucket, final String destinationKey) {
final String spacer = destinationKey.startsWith("/") ? "" : "/";
return String.format("s3://%s%s%s", destinationBucket, spacer, destinationKey);

View File

@ -24,6 +24,8 @@ import com.amazonaws.regions.Region;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.CopyObjectRequest;
import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
import com.amazonaws.services.s3.model.ObjectMetadata;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.aws.testutil.AuthUtils;
import org.apache.nifi.provenance.ProvenanceEventRecord;
@ -56,6 +58,12 @@ public class TestCopyS3Object {
@Override
protected AmazonS3Client createClient(final ProcessContext context, final AWSCredentialsProvider credentialsProvider, final Region region, final ClientConfiguration config,
final AwsClientBuilder.EndpointConfiguration endpointConfiguration) {
ObjectMetadata metadata = mock(ObjectMetadata.class);
when(metadata.getContentLength()).thenReturn(1000L);
when(mockS3Client.getObjectMetadata(any(GetObjectMetadataRequest.class)))
.thenReturn(metadata);
return mockS3Client;
}
};