mirror of https://github.com/apache/nifi.git
NIFI-1107: Re-integrate Multipart Upload changes into PutS3Object. 1. add Multipart upload logic to allow resuming an upload after process/instance restart, 2. add local state management to track the part uploaded for a flowfile, 3. add configurable AWS S3 state management to abort orphaned uploads, and 4. adapt to IT test naming.
Pull request updates: * Cleanup property description to reflect processor details, not code specifics. * Only resume an upload from local state if uploadID still exists in S3, otherwise delete local state and start a new upload. * Check that local state from was actually created before trying to delete it. Re-integrate Multipart Upload changes into PutS3Object. 1. add Multipart upload logic to allow resuming an upload after process/instance restart, 2. add local state management to track the part uploaded for a flowfile, 3. add configurable AWS S3 state management to abort orphaned uploads, and 4. adapt to IT test naming. Fixed test issues * added mock client for testing local state without matching S3 state, * updated regex for multipart etag pattern. Reviewed by Tony Kurc (tkurc@apache.org). This closes #192
This commit is contained in:
parent
478226451c
commit
769b044e30
|
@ -17,18 +17,28 @@
|
||||||
package org.apache.nifi.processors.aws.s3;
|
package org.apache.nifi.processors.aws.s3;
|
||||||
|
|
||||||
import java.io.BufferedInputStream;
|
import java.io.BufferedInputStream;
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.FileInputStream;
|
||||||
|
import java.io.FileOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
|
import java.io.Serializable;
|
||||||
|
import java.nio.file.Files;
|
||||||
|
import java.text.DateFormat;
|
||||||
|
import java.text.SimpleDateFormat;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Date;
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Map.Entry;
|
||||||
|
import java.util.Properties;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
import java.util.concurrent.locks.Lock;
|
||||||
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
|
|
||||||
import org.apache.commons.lang3.StringUtils;
|
|
||||||
import org.apache.nifi.annotation.behavior.DynamicProperty;
|
import org.apache.nifi.annotation.behavior.DynamicProperty;
|
||||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||||
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
|
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
|
||||||
|
@ -42,6 +52,7 @@ import org.apache.nifi.annotation.documentation.Tags;
|
||||||
import org.apache.nifi.components.PropertyDescriptor;
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
import org.apache.nifi.flowfile.FlowFile;
|
import org.apache.nifi.flowfile.FlowFile;
|
||||||
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||||
|
import org.apache.nifi.processor.DataUnit;
|
||||||
import org.apache.nifi.processor.ProcessContext;
|
import org.apache.nifi.processor.ProcessContext;
|
||||||
import org.apache.nifi.processor.ProcessSession;
|
import org.apache.nifi.processor.ProcessSession;
|
||||||
import org.apache.nifi.processor.exception.ProcessException;
|
import org.apache.nifi.processor.exception.ProcessException;
|
||||||
|
@ -50,31 +61,66 @@ import org.apache.nifi.processor.util.StandardValidators;
|
||||||
|
|
||||||
import com.amazonaws.AmazonClientException;
|
import com.amazonaws.AmazonClientException;
|
||||||
import com.amazonaws.services.s3.AmazonS3Client;
|
import com.amazonaws.services.s3.AmazonS3Client;
|
||||||
|
import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
|
||||||
import com.amazonaws.services.s3.model.AccessControlList;
|
import com.amazonaws.services.s3.model.AccessControlList;
|
||||||
|
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
|
||||||
|
import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
|
||||||
|
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
|
||||||
|
import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
|
||||||
|
import com.amazonaws.services.s3.model.ListMultipartUploadsRequest;
|
||||||
|
import com.amazonaws.services.s3.model.MultipartUpload;
|
||||||
|
import com.amazonaws.services.s3.model.MultipartUploadListing;
|
||||||
import com.amazonaws.services.s3.model.ObjectMetadata;
|
import com.amazonaws.services.s3.model.ObjectMetadata;
|
||||||
|
import com.amazonaws.services.s3.model.PartETag;
|
||||||
import com.amazonaws.services.s3.model.PutObjectRequest;
|
import com.amazonaws.services.s3.model.PutObjectRequest;
|
||||||
import com.amazonaws.services.s3.model.PutObjectResult;
|
import com.amazonaws.services.s3.model.PutObjectResult;
|
||||||
import com.amazonaws.services.s3.model.StorageClass;
|
import com.amazonaws.services.s3.model.StorageClass;
|
||||||
|
import com.amazonaws.services.s3.model.UploadPartRequest;
|
||||||
|
import com.amazonaws.services.s3.model.UploadPartResult;
|
||||||
|
|
||||||
@SupportsBatching
|
@SupportsBatching
|
||||||
@SeeAlso({FetchS3Object.class, DeleteS3Object.class})
|
@SeeAlso({FetchS3Object.class, DeleteS3Object.class})
|
||||||
@InputRequirement(Requirement.INPUT_REQUIRED)
|
@InputRequirement(Requirement.INPUT_REQUIRED)
|
||||||
@Tags({"Amazon", "S3", "AWS", "Archive", "Put"})
|
@Tags({"Amazon", "S3", "AWS", "Archive", "Put"})
|
||||||
@CapabilityDescription("Puts FlowFiles to an Amazon S3 Bucket")
|
@CapabilityDescription("Puts FlowFiles to an Amazon S3 Bucket\n" +
|
||||||
@DynamicProperty(name = "The name of a User-Defined Metadata field to add to the S3 Object", value = "The value of a User-Defined Metadata field to add to the S3 Object",
|
"The upload uses either the PutS3Object method or PutS3MultipartUpload methods. The PutS3Object method " +
|
||||||
description = "Allows user-defined metadata to be added to the S3 object as key/value pairs", supportsExpressionLanguage = true)
|
"send the file in a single synchronous call, but it has a 5GB size limit. Larger files are sent using the " +
|
||||||
|
"multipart upload methods that initiate, transfer the parts, and complete an upload. This multipart process " +
|
||||||
|
"saves state after each step so that a large upload can be resumed with minimal loss if the processor or " +
|
||||||
|
"cluster is stopped and restarted.\n" +
|
||||||
|
"A multipart upload consists of three steps\n" +
|
||||||
|
" 1) initiate upload,\n" +
|
||||||
|
" 2) upload the parts, and\n" +
|
||||||
|
" 3) complete the upload.\n" +
|
||||||
|
"For multipart uploads, the processor saves state locally tracking the upload ID and parts uploaded, which " +
|
||||||
|
"must both be provided to complete the upload.\n" +
|
||||||
|
"The AWS libraries select an endpoint URL based on the AWS region, but this can be overridden with the " +
|
||||||
|
"'Endpoint Override URL' property for use with other S3-compatible endpoints.\n" +
|
||||||
|
"The S3 API specifies that the maximum file size for a PutS3Object upload is 5GB. It also requires that " +
|
||||||
|
"parts in a multipart upload must be at least 5MB in size, except for the last part. These limits are " +
|
||||||
|
"establish the bounds for the Multipart Upload Threshold and Part Size properties.")
|
||||||
|
@DynamicProperty(name = "The name of a User-Defined Metadata field to add to the S3 Object",
|
||||||
|
value = "The value of a User-Defined Metadata field to add to the S3 Object",
|
||||||
|
description = "Allows user-defined metadata to be added to the S3 object as key/value pairs",
|
||||||
|
supportsExpressionLanguage = true)
|
||||||
@ReadsAttribute(attribute = "filename", description = "Uses the FlowFile's filename as the filename for the S3 object")
|
@ReadsAttribute(attribute = "filename", description = "Uses the FlowFile's filename as the filename for the S3 object")
|
||||||
@WritesAttributes({
|
@WritesAttributes({
|
||||||
@WritesAttribute(attribute = "s3.bucket", description = "The S3 bucket where the Object was put in S3"),
|
@WritesAttribute(attribute = "s3.bucket", description = "The S3 bucket where the Object was put in S3"),
|
||||||
@WritesAttribute(attribute = "s3.key", description = "The S3 key within where the Object was put in S3"),
|
@WritesAttribute(attribute = "s3.key", description = "The S3 key within where the Object was put in S3"),
|
||||||
@WritesAttribute(attribute = "s3.version", description = "The version of the S3 Object that was put to S3"),
|
@WritesAttribute(attribute = "s3.version", description = "The version of the S3 Object that was put to S3"),
|
||||||
@WritesAttribute(attribute = "s3.etag", description = "The ETag of the S3 Object"),
|
@WritesAttribute(attribute = "s3.etag", description = "The ETag of the S3 Object"),
|
||||||
@WritesAttribute(attribute = "s3.expiration", description = "A human-readable form of the expiration date of the S3 object, if one is set"),
|
|
||||||
@WritesAttribute(attribute = "s3.uploadId", description = "The uploadId used to upload the Object to S3"),
|
@WritesAttribute(attribute = "s3.uploadId", description = "The uploadId used to upload the Object to S3"),
|
||||||
@WritesAttribute(attribute = "s3.usermetadata", description = "A human-readable form of the User Metadata of the S3 object, if any was set")
|
@WritesAttribute(attribute = "s3.expiration", description = "A human-readable form of the expiration date of " +
|
||||||
|
"the S3 object, if one is set"),
|
||||||
|
@WritesAttribute(attribute = "s3.usermetadata", description = "A human-readable form of the User Metadata of " +
|
||||||
|
"the S3 object, if any was set")
|
||||||
})
|
})
|
||||||
public class PutS3Object extends AbstractS3Processor {
|
public class PutS3Object extends AbstractS3Processor {
|
||||||
|
|
||||||
|
public static final long MIN_S3_PART_SIZE = 50L * 1024L * 1024L;
|
||||||
|
public static final long MAX_S3_PUTOBJECT_SIZE = 5L * 1024L * 1024L * 1024L;
|
||||||
|
public static final String PERSISTENCE_ROOT = "conf/state/";
|
||||||
|
|
||||||
public static final PropertyDescriptor EXPIRATION_RULE_ID = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor EXPIRATION_RULE_ID = new PropertyDescriptor.Builder()
|
||||||
.name("Expiration Time Rule")
|
.name("Expiration Time Rule")
|
||||||
.required(false)
|
.required(false)
|
||||||
|
@ -89,9 +135,51 @@ public class PutS3Object extends AbstractS3Processor {
|
||||||
.defaultValue(StorageClass.Standard.name())
|
.defaultValue(StorageClass.Standard.name())
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor MULTIPART_THRESHOLD = new PropertyDescriptor.Builder()
|
||||||
|
.name("Multipart Threshold")
|
||||||
|
.description("Specifies the file size threshold for switch from the PutS3Object API to the " +
|
||||||
|
"PutS3MultipartUpload API. Flow files bigger than this limit will be sent using the stateful " +
|
||||||
|
"multipart process.\n" +
|
||||||
|
"The valid range is 50MB to 5GB.")
|
||||||
|
.required(true)
|
||||||
|
.defaultValue("5 GB")
|
||||||
|
.addValidator(StandardValidators.createDataSizeBoundsValidator(MIN_S3_PART_SIZE, MAX_S3_PUTOBJECT_SIZE))
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor MULTIPART_PART_SIZE = new PropertyDescriptor.Builder()
|
||||||
|
.name("Multipart Part Size")
|
||||||
|
.description("Specifies the part size for use when the PutS3Multipart Upload API is used.\n" +
|
||||||
|
"Flow files will be broken into chunks of this size for the upload process, but the last part " +
|
||||||
|
"sent can be smaller since it is not padded.\n" +
|
||||||
|
"The valid range is 50MB to 5GB.")
|
||||||
|
.required(true)
|
||||||
|
.defaultValue("5 GB")
|
||||||
|
.addValidator(StandardValidators.createDataSizeBoundsValidator(MIN_S3_PART_SIZE, MAX_S3_PUTOBJECT_SIZE))
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor MULTIPART_S3_AGEOFF_INTERVAL = new PropertyDescriptor.Builder()
|
||||||
|
.name("Multipart Upload AgeOff Interval")
|
||||||
|
.description("Specifies the interval at which existing multipart uploads in AWS S3 will be evaluated " +
|
||||||
|
"for ageoff. When processor is triggered it will initiate the ageoff evaluation if this interval has been " +
|
||||||
|
"exceeded.")
|
||||||
|
.required(true)
|
||||||
|
.defaultValue("60 min")
|
||||||
|
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor MULTIPART_S3_MAX_AGE = new PropertyDescriptor.Builder()
|
||||||
|
.name("Multipart Upload Max Age Threshold")
|
||||||
|
.description("Specifies the maximum age for existing multipart uploads in AWS S3. When the ageoff " +
|
||||||
|
"process occurs, any upload older than this threshold will be aborted.")
|
||||||
|
.required(true)
|
||||||
|
.defaultValue("7 days")
|
||||||
|
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
|
||||||
|
.build();
|
||||||
|
|
||||||
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
|
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
|
||||||
Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, STORAGE_CLASS, REGION, TIMEOUT, EXPIRATION_RULE_ID,
|
Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, STORAGE_CLASS, REGION, TIMEOUT, EXPIRATION_RULE_ID,
|
||||||
FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE));
|
FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER, SSL_CONTEXT_SERVICE,
|
||||||
|
ENDPOINT_OVERRIDE, MULTIPART_THRESHOLD, MULTIPART_PART_SIZE, MULTIPART_S3_AGEOFF_INTERVAL, MULTIPART_S3_MAX_AGE));
|
||||||
|
|
||||||
final static String S3_BUCKET_KEY = "s3.bucket";
|
final static String S3_BUCKET_KEY = "s3.bucket";
|
||||||
final static String S3_OBJECT_KEY = "s3.key";
|
final static String S3_OBJECT_KEY = "s3.key";
|
||||||
|
@ -102,6 +190,11 @@ public class PutS3Object extends AbstractS3Processor {
|
||||||
final static String S3_STORAGECLASS_ATTR_KEY = "s3.storeClass";
|
final static String S3_STORAGECLASS_ATTR_KEY = "s3.storeClass";
|
||||||
final static String S3_STORAGECLASS_META_KEY = "x-amz-storage-class";
|
final static String S3_STORAGECLASS_META_KEY = "x-amz-storage-class";
|
||||||
final static String S3_USERMETA_ATTR_KEY = "s3.usermetadata";
|
final static String S3_USERMETA_ATTR_KEY = "s3.usermetadata";
|
||||||
|
final static String S3_API_METHOD_ATTR_KEY = "s3.apimethod";
|
||||||
|
final static String S3_API_METHOD_PUTOBJECT = "putobject";
|
||||||
|
final static String S3_API_METHOD_MULTIPARTUPLOAD = "multipartupload";
|
||||||
|
|
||||||
|
final static String S3_PROCESS_UNSCHEDULED_MESSAGE = "Processor unscheduled, stopping upload";
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||||
|
@ -118,6 +211,144 @@ public class PutS3Object extends AbstractS3Processor {
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected File getPersistenceFile() {
|
||||||
|
return new File(PERSISTENCE_ROOT + getIdentifier());
|
||||||
|
}
|
||||||
|
|
||||||
|
protected boolean localUploadExistsInS3(final AmazonS3Client s3, final String bucket, final MultipartState localState) {
|
||||||
|
ListMultipartUploadsRequest listRequest = new ListMultipartUploadsRequest(bucket);
|
||||||
|
MultipartUploadListing listing = s3.listMultipartUploads(listRequest);
|
||||||
|
for (MultipartUpload upload : listing.getMultipartUploads()) {
|
||||||
|
if (upload.getUploadId().equals(localState.getUploadId())) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected synchronized MultipartState getLocalStateIfInS3(final AmazonS3Client s3, final String bucket,
|
||||||
|
final String s3ObjectKey) throws IOException {
|
||||||
|
MultipartState currState = getLocalState(s3ObjectKey);
|
||||||
|
if (currState == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
if (localUploadExistsInS3(s3, bucket, currState)) {
|
||||||
|
getLogger().info("Local state for {} loaded with uploadId {} and {} partETags",
|
||||||
|
new Object[]{s3ObjectKey, currState.getUploadId(), currState.getPartETags().size()});
|
||||||
|
return currState;
|
||||||
|
} else {
|
||||||
|
getLogger().info("Local state for {} with uploadId {} does not exist in S3, deleting local state",
|
||||||
|
new Object[]{s3ObjectKey, currState.getUploadId()});
|
||||||
|
persistLocalState(s3ObjectKey, null);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected synchronized MultipartState getLocalState(final String s3ObjectKey) throws IOException {
|
||||||
|
// get local state if it exists
|
||||||
|
final File persistenceFile = getPersistenceFile();
|
||||||
|
|
||||||
|
if (persistenceFile.exists()) {
|
||||||
|
final Properties props = new Properties();
|
||||||
|
try (final FileInputStream fis = new FileInputStream(persistenceFile)) {
|
||||||
|
props.load(fis);
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
getLogger().warn("Failed to recover local state for {} due to {}. Assuming no local state and " +
|
||||||
|
"restarting upload.", new Object[]{s3ObjectKey, ioe.getMessage()});
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
if (props.containsKey(s3ObjectKey)) {
|
||||||
|
final String localSerialState = props.getProperty(s3ObjectKey);
|
||||||
|
if (localSerialState != null) {
|
||||||
|
try {
|
||||||
|
return new MultipartState(localSerialState);
|
||||||
|
} catch (final RuntimeException rte) {
|
||||||
|
getLogger().warn("Failed to recover local state for {} due to corrupt data in state.", new Object[]{s3ObjectKey, rte.getMessage()});
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected synchronized void persistLocalState(final String s3ObjectKey, final MultipartState currState) throws IOException {
|
||||||
|
final String currStateStr = (currState == null) ? null : currState.toString();
|
||||||
|
final File persistenceFile = getPersistenceFile();
|
||||||
|
final File parentDir = persistenceFile.getParentFile();
|
||||||
|
if (!parentDir.exists() && !parentDir.mkdirs()) {
|
||||||
|
throw new IOException("Persistence directory (" + parentDir.getAbsolutePath() + ") does not exist and " +
|
||||||
|
"could not be created.");
|
||||||
|
}
|
||||||
|
final Properties props = new Properties();
|
||||||
|
if (persistenceFile.exists()) {
|
||||||
|
try (final FileInputStream fis = new FileInputStream(persistenceFile)) {
|
||||||
|
props.load(fis);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (currStateStr != null) {
|
||||||
|
currState.setTimestamp(System.currentTimeMillis());
|
||||||
|
props.setProperty(s3ObjectKey, currStateStr);
|
||||||
|
} else {
|
||||||
|
props.remove(s3ObjectKey);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (props.size() > 0) {
|
||||||
|
try (final FileOutputStream fos = new FileOutputStream(persistenceFile)) {
|
||||||
|
props.store(fos, null);
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
getLogger().error("Could not store state {} due to {}.",
|
||||||
|
new Object[]{persistenceFile.getAbsolutePath(), ioe.getMessage()});
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (persistenceFile.exists()) {
|
||||||
|
try {
|
||||||
|
Files.delete(persistenceFile.toPath());
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
getLogger().error("Could not remove state file {} due to {}.",
|
||||||
|
new Object[]{persistenceFile.getAbsolutePath(), ioe.getMessage()});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected synchronized void removeLocalState(final String s3ObjectKey) throws IOException {
|
||||||
|
persistLocalState(s3ObjectKey, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
private synchronized void ageoffLocalState(long ageCutoff) {
|
||||||
|
// get local state if it exists
|
||||||
|
final File persistenceFile = getPersistenceFile();
|
||||||
|
if (persistenceFile.exists()) {
|
||||||
|
Properties props = new Properties();
|
||||||
|
try (final FileInputStream fis = new FileInputStream(persistenceFile)) {
|
||||||
|
props.load(fis);
|
||||||
|
} catch (final IOException ioe) {
|
||||||
|
getLogger().warn("Failed to ageoff remove local state due to {}",
|
||||||
|
new Object[]{ioe.getMessage()});
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
for (Entry<Object,Object> entry: props.entrySet()) {
|
||||||
|
final String key = (String) entry.getKey();
|
||||||
|
final String localSerialState = props.getProperty(key);
|
||||||
|
if (localSerialState != null) {
|
||||||
|
final MultipartState state = new MultipartState(localSerialState);
|
||||||
|
if (state.getTimestamp() < ageCutoff) {
|
||||||
|
getLogger().warn("Removing local state for {} due to exceeding ageoff time",
|
||||||
|
new Object[]{key});
|
||||||
|
try {
|
||||||
|
removeLocalState(key);
|
||||||
|
} catch (final IOException ioe) {
|
||||||
|
getLogger().warn("Failed to remove local state for {} due to {}",
|
||||||
|
new Object[]{key, ioe.getMessage()});
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onTrigger(final ProcessContext context, final ProcessSession session) {
|
public void onTrigger(final ProcessContext context, final ProcessSession session) {
|
||||||
FlowFile flowFile = session.get();
|
FlowFile flowFile = session.get();
|
||||||
|
@ -129,13 +360,28 @@ public class PutS3Object extends AbstractS3Processor {
|
||||||
|
|
||||||
final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions(flowFile).getValue();
|
final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions(flowFile).getValue();
|
||||||
final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
|
final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
|
||||||
|
final String cacheKey = getIdentifier() + "/" + bucket + "/" + key;
|
||||||
|
|
||||||
final AmazonS3Client s3 = getClient();
|
final AmazonS3Client s3 = getClient();
|
||||||
final FlowFile ff = flowFile;
|
final FlowFile ff = flowFile;
|
||||||
final Map<String, String> attributes = new HashMap<>();
|
final Map<String, String> attributes = new HashMap<>();
|
||||||
|
final String ffFilename = ff.getAttributes().get(CoreAttributes.FILENAME.key());
|
||||||
attributes.put(S3_BUCKET_KEY, bucket);
|
attributes.put(S3_BUCKET_KEY, bucket);
|
||||||
attributes.put(S3_OBJECT_KEY, key);
|
attributes.put(S3_OBJECT_KEY, key);
|
||||||
|
|
||||||
|
final Long multipartThreshold = context.getProperty(MULTIPART_THRESHOLD).asDataSize(DataUnit.B).longValue();
|
||||||
|
final Long multipartPartSize = context.getProperty(MULTIPART_PART_SIZE).asDataSize(DataUnit.B).longValue();
|
||||||
|
|
||||||
|
final long now = System.currentTimeMillis();
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If necessary, run age off for existing uploads in AWS S3 and local state
|
||||||
|
*/
|
||||||
|
ageoffS3Uploads(context, s3, now);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Then
|
||||||
|
*/
|
||||||
try {
|
try {
|
||||||
session.read(flowFile, new InputStreamCallback() {
|
session.read(flowFile, new InputStreamCallback() {
|
||||||
@Override
|
@Override
|
||||||
|
@ -145,7 +391,8 @@ public class PutS3Object extends AbstractS3Processor {
|
||||||
objectMetadata.setContentDisposition(ff.getAttribute(CoreAttributes.FILENAME.key()));
|
objectMetadata.setContentDisposition(ff.getAttribute(CoreAttributes.FILENAME.key()));
|
||||||
objectMetadata.setContentLength(ff.getSize());
|
objectMetadata.setContentLength(ff.getSize());
|
||||||
|
|
||||||
final String expirationRule = context.getProperty(EXPIRATION_RULE_ID).evaluateAttributeExpressions(ff).getValue();
|
final String expirationRule = context.getProperty(EXPIRATION_RULE_ID)
|
||||||
|
.evaluateAttributeExpressions(ff).getValue();
|
||||||
if (expirationRule != null) {
|
if (expirationRule != null) {
|
||||||
objectMetadata.setExpirationTimeRuleId(expirationRule);
|
objectMetadata.setExpirationTimeRuleId(expirationRule);
|
||||||
}
|
}
|
||||||
|
@ -153,7 +400,8 @@ public class PutS3Object extends AbstractS3Processor {
|
||||||
final Map<String, String> userMetadata = new HashMap<>();
|
final Map<String, String> userMetadata = new HashMap<>();
|
||||||
for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
|
for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
|
||||||
if (entry.getKey().isDynamic()) {
|
if (entry.getKey().isDynamic()) {
|
||||||
final String value = context.getProperty(entry.getKey()).evaluateAttributeExpressions(ff).getValue();
|
final String value = context.getProperty(
|
||||||
|
entry.getKey()).evaluateAttributeExpressions(ff).getValue();
|
||||||
userMetadata.put(entry.getKey().getName(), value);
|
userMetadata.put(entry.getKey().getName(), value);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -162,34 +410,223 @@ public class PutS3Object extends AbstractS3Processor {
|
||||||
objectMetadata.setUserMetadata(userMetadata);
|
objectMetadata.setUserMetadata(userMetadata);
|
||||||
}
|
}
|
||||||
|
|
||||||
final PutObjectRequest request = new PutObjectRequest(bucket, key, in, objectMetadata);
|
if (ff.getSize() <= multipartThreshold) {
|
||||||
request.setStorageClass(StorageClass.valueOf(context.getProperty(STORAGE_CLASS).getValue()));
|
//----------------------------------------
|
||||||
final AccessControlList acl = createACL(context, ff);
|
// single part upload
|
||||||
if (acl != null) {
|
//----------------------------------------
|
||||||
request.setAccessControlList(acl);
|
final PutObjectRequest request = new PutObjectRequest(bucket, key, in, objectMetadata);
|
||||||
}
|
request.setStorageClass(
|
||||||
|
StorageClass.valueOf(context.getProperty(STORAGE_CLASS).getValue()));
|
||||||
final PutObjectResult result = s3.putObject(request);
|
final AccessControlList acl = createACL(context, ff);
|
||||||
if (result.getVersionId() != null) {
|
if (acl != null) {
|
||||||
attributes.put(S3_VERSION_ATTR_KEY, result.getVersionId());
|
request.setAccessControlList(acl);
|
||||||
}
|
}
|
||||||
|
|
||||||
attributes.put(S3_ETAG_ATTR_KEY, result.getETag());
|
try {
|
||||||
|
final PutObjectResult result = s3.putObject(request);
|
||||||
final Date expiration = result.getExpirationTime();
|
if (result.getVersionId() != null) {
|
||||||
if (expiration != null) {
|
attributes.put(S3_VERSION_ATTR_KEY, result.getVersionId());
|
||||||
attributes.put(S3_EXPIRATION_ATTR_KEY, expiration.toString());
|
}
|
||||||
}
|
if (result.getETag() != null) {
|
||||||
if (result.getMetadata().getRawMetadata().keySet().contains(S3_STORAGECLASS_META_KEY)) {
|
attributes.put(S3_ETAG_ATTR_KEY, result.getETag());
|
||||||
attributes.put(S3_STORAGECLASS_ATTR_KEY,
|
}
|
||||||
result.getMetadata().getRawMetadataValue(S3_STORAGECLASS_META_KEY).toString());
|
if (result.getExpirationTime() != null) {
|
||||||
}
|
attributes.put(S3_EXPIRATION_ATTR_KEY, result.getExpirationTime().toString());
|
||||||
if (userMetadata.size() > 0) {
|
}
|
||||||
List<String> pairs = new ArrayList<String>();
|
if (result.getMetadata().getRawMetadata().keySet().contains(S3_STORAGECLASS_META_KEY)) {
|
||||||
for (String userKey : userMetadata.keySet()) {
|
attributes.put(S3_STORAGECLASS_ATTR_KEY,
|
||||||
pairs.add(userKey + "=" + userMetadata.get(userKey));
|
result.getMetadata().getRawMetadataValue(S3_STORAGECLASS_META_KEY).toString());
|
||||||
|
}
|
||||||
|
if (userMetadata.size() > 0) {
|
||||||
|
StringBuilder userMetaBldr = new StringBuilder();
|
||||||
|
for (String userKey : userMetadata.keySet()) {
|
||||||
|
userMetaBldr.append(userKey).append("=").append(userMetadata.get(userKey));
|
||||||
|
}
|
||||||
|
attributes.put(S3_USERMETA_ATTR_KEY, userMetaBldr.toString());
|
||||||
|
}
|
||||||
|
attributes.put(S3_API_METHOD_ATTR_KEY, S3_API_METHOD_PUTOBJECT);
|
||||||
|
} catch (AmazonClientException e) {
|
||||||
|
getLogger().info("Failure completing upload flowfile={} bucket={} key={} reason={}",
|
||||||
|
new Object[]{ffFilename, bucket, key, e.getMessage()});
|
||||||
|
throw (e);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
//----------------------------------------
|
||||||
|
// multipart upload
|
||||||
|
//----------------------------------------
|
||||||
|
|
||||||
|
// load or create persistent state
|
||||||
|
//------------------------------------------------------------
|
||||||
|
MultipartState currentState;
|
||||||
|
try {
|
||||||
|
currentState = getLocalStateIfInS3(s3, bucket, cacheKey);
|
||||||
|
if (currentState != null) {
|
||||||
|
if (currentState.getPartETags().size() > 0) {
|
||||||
|
final PartETag lastETag = currentState.getPartETags().get(
|
||||||
|
currentState.getPartETags().size() - 1);
|
||||||
|
getLogger().info("Resuming upload for flowfile='{}' bucket='{}' key='{}' " +
|
||||||
|
"uploadID='{}' filePosition='{}' partSize='{}' storageClass='{}' " +
|
||||||
|
"contentLength='{}' partsLoaded={} lastPart={}/{}",
|
||||||
|
new Object[]{ffFilename, bucket, key, currentState.getUploadId(),
|
||||||
|
currentState.getFilePosition(), currentState.getPartSize(),
|
||||||
|
currentState.getStorageClass().toString(),
|
||||||
|
currentState.getContentLength(),
|
||||||
|
currentState.getPartETags().size(),
|
||||||
|
Integer.toString(lastETag.getPartNumber()),
|
||||||
|
lastETag.getETag()});
|
||||||
|
} else {
|
||||||
|
getLogger().info("Resuming upload for flowfile='{}' bucket='{}' key='{}' " +
|
||||||
|
"uploadID='{}' filePosition='{}' partSize='{}' storageClass='{}' " +
|
||||||
|
"contentLength='{}' no partsLoaded",
|
||||||
|
new Object[]{ffFilename, bucket, key, currentState.getUploadId(),
|
||||||
|
currentState.getFilePosition(), currentState.getPartSize(),
|
||||||
|
currentState.getStorageClass().toString(),
|
||||||
|
currentState.getContentLength()});
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
currentState = new MultipartState();
|
||||||
|
currentState.setPartSize(multipartPartSize);
|
||||||
|
currentState.setStorageClass(
|
||||||
|
StorageClass.valueOf(context.getProperty(STORAGE_CLASS).getValue()));
|
||||||
|
currentState.setContentLength(ff.getSize());
|
||||||
|
persistLocalState(cacheKey, currentState);
|
||||||
|
getLogger().info("Starting new upload for flowfile='{}' bucket='{}' key='{}'",
|
||||||
|
new Object[]{ffFilename, bucket, key});
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
getLogger().error("IOException initiating cache state while processing flow files: " +
|
||||||
|
e.getMessage());
|
||||||
|
throw (e);
|
||||||
|
}
|
||||||
|
|
||||||
|
// initiate multipart upload or find position in file
|
||||||
|
//------------------------------------------------------------
|
||||||
|
if (currentState.getUploadId().isEmpty()) {
|
||||||
|
final InitiateMultipartUploadRequest initiateRequest =
|
||||||
|
new InitiateMultipartUploadRequest(bucket, key, objectMetadata);
|
||||||
|
initiateRequest.setStorageClass(currentState.getStorageClass());
|
||||||
|
final AccessControlList acl = createACL(context, ff);
|
||||||
|
if (acl != null) {
|
||||||
|
initiateRequest.setAccessControlList(acl);
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
final InitiateMultipartUploadResult initiateResult =
|
||||||
|
s3.initiateMultipartUpload(initiateRequest);
|
||||||
|
currentState.setUploadId(initiateResult.getUploadId());
|
||||||
|
currentState.getPartETags().clear();
|
||||||
|
try {
|
||||||
|
persistLocalState(cacheKey, currentState);
|
||||||
|
} catch (Exception e) {
|
||||||
|
getLogger().info("Exception saving cache state while processing flow file: " +
|
||||||
|
e.getMessage());
|
||||||
|
throw(new ProcessException("Exception saving cache state", e));
|
||||||
|
}
|
||||||
|
getLogger().info("Success initiating upload flowfile={} available={} position={} " +
|
||||||
|
"length={} bucket={} key={} uploadId={}",
|
||||||
|
new Object[]{ffFilename, in.available(), currentState.getFilePosition(),
|
||||||
|
currentState.getContentLength(), bucket, key,
|
||||||
|
currentState.getUploadId()});
|
||||||
|
if (initiateResult.getUploadId() != null) {
|
||||||
|
attributes.put(S3_UPLOAD_ID_ATTR_KEY, initiateResult.getUploadId());
|
||||||
|
}
|
||||||
|
} catch (AmazonClientException e) {
|
||||||
|
getLogger().info("Failure initiating upload flowfile={} bucket={} key={} reason={}",
|
||||||
|
new Object[]{ffFilename, bucket, key, e.getMessage()});
|
||||||
|
throw(e);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (currentState.getFilePosition() > 0) {
|
||||||
|
try {
|
||||||
|
final long skipped = in.skip(currentState.getFilePosition());
|
||||||
|
if (skipped != currentState.getFilePosition()) {
|
||||||
|
getLogger().info("Failure skipping to resume upload flowfile={} " +
|
||||||
|
"bucket={} key={} position={} skipped={}",
|
||||||
|
new Object[]{ffFilename, bucket, key,
|
||||||
|
currentState.getFilePosition(), skipped});
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
getLogger().info("Failure skipping to resume upload flowfile={} bucket={} " +
|
||||||
|
"key={} position={} reason={}",
|
||||||
|
new Object[]{ffFilename, bucket, key, currentState.getFilePosition(),
|
||||||
|
e.getMessage()});
|
||||||
|
throw(new ProcessException(e));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// upload parts
|
||||||
|
//------------------------------------------------------------
|
||||||
|
long thisPartSize;
|
||||||
|
for (int part = currentState.getPartETags().size() + 1;
|
||||||
|
currentState.getFilePosition() < currentState.getContentLength(); part++) {
|
||||||
|
if (!PutS3Object.this.isScheduled()) {
|
||||||
|
throw new IOException(S3_PROCESS_UNSCHEDULED_MESSAGE + " flowfile=" + ffFilename +
|
||||||
|
" part=" + part + " uploadId=" + currentState.getUploadId());
|
||||||
|
}
|
||||||
|
thisPartSize = Math.min(currentState.getPartSize(),
|
||||||
|
(currentState.getContentLength() - currentState.getFilePosition()));
|
||||||
|
UploadPartRequest uploadRequest = new UploadPartRequest()
|
||||||
|
.withBucketName(bucket)
|
||||||
|
.withKey(key)
|
||||||
|
.withUploadId(currentState.getUploadId())
|
||||||
|
.withInputStream(in)
|
||||||
|
.withPartNumber(part)
|
||||||
|
.withPartSize(thisPartSize);
|
||||||
|
try {
|
||||||
|
UploadPartResult uploadPartResult = s3.uploadPart(uploadRequest);
|
||||||
|
currentState.addPartETag(uploadPartResult.getPartETag());
|
||||||
|
currentState.setFilePosition(currentState.getFilePosition() + thisPartSize);
|
||||||
|
try {
|
||||||
|
persistLocalState(cacheKey, currentState);
|
||||||
|
} catch (Exception e) {
|
||||||
|
getLogger().info("Exception saving cache state processing flow file: " +
|
||||||
|
e.getMessage());
|
||||||
|
}
|
||||||
|
getLogger().info("Success uploading part flowfile={} part={} available={} " +
|
||||||
|
"etag={} uploadId={}", new Object[]{ffFilename, part, in.available(),
|
||||||
|
uploadPartResult.getETag(), currentState.getUploadId()});
|
||||||
|
} catch (AmazonClientException e) {
|
||||||
|
getLogger().info("Failure uploading part flowfile={} part={} bucket={} key={} " +
|
||||||
|
"reason={}", new Object[]{ffFilename, part, bucket, key, e.getMessage()});
|
||||||
|
throw (e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// complete multipart upload
|
||||||
|
//------------------------------------------------------------
|
||||||
|
CompleteMultipartUploadRequest completeRequest = new CompleteMultipartUploadRequest(
|
||||||
|
bucket, key, currentState.getUploadId(), currentState.getPartETags());
|
||||||
|
try {
|
||||||
|
CompleteMultipartUploadResult completeResult =
|
||||||
|
s3.completeMultipartUpload(completeRequest);
|
||||||
|
getLogger().info("Success completing upload flowfile={} etag={} uploadId={}",
|
||||||
|
new Object[]{ffFilename, completeResult.getETag(), currentState.getUploadId()});
|
||||||
|
if (completeResult.getVersionId() != null) {
|
||||||
|
attributes.put(S3_VERSION_ATTR_KEY, completeResult.getVersionId());
|
||||||
|
}
|
||||||
|
if (completeResult.getETag() != null) {
|
||||||
|
attributes.put(S3_ETAG_ATTR_KEY, completeResult.getETag());
|
||||||
|
}
|
||||||
|
if (completeResult.getExpirationTime() != null) {
|
||||||
|
attributes.put(S3_EXPIRATION_ATTR_KEY,
|
||||||
|
completeResult.getExpirationTime().toString());
|
||||||
|
}
|
||||||
|
if (currentState.getStorageClass() != null) {
|
||||||
|
attributes.put(S3_STORAGECLASS_ATTR_KEY, currentState.getStorageClass().toString());
|
||||||
|
}
|
||||||
|
if (userMetadata.size() > 0) {
|
||||||
|
StringBuilder userMetaBldr = new StringBuilder();
|
||||||
|
for (String userKey : userMetadata.keySet()) {
|
||||||
|
userMetaBldr.append(userKey).append("=").append(userMetadata.get(userKey));
|
||||||
|
}
|
||||||
|
attributes.put(S3_USERMETA_ATTR_KEY, userMetaBldr.toString());
|
||||||
|
}
|
||||||
|
attributes.put(S3_API_METHOD_ATTR_KEY, S3_API_METHOD_MULTIPARTUPLOAD);
|
||||||
|
} catch (AmazonClientException e) {
|
||||||
|
getLogger().info("Failure completing upload flowfile={} bucket={} key={} reason={}",
|
||||||
|
new Object[]{ffFilename, bucket, key, e.getMessage()});
|
||||||
|
throw (e);
|
||||||
}
|
}
|
||||||
attributes.put(S3_USERMETA_ATTR_KEY, StringUtils.join(pairs, ", "));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -205,10 +642,205 @@ public class PutS3Object extends AbstractS3Processor {
|
||||||
session.getProvenanceReporter().send(flowFile, url, millis);
|
session.getProvenanceReporter().send(flowFile, url, millis);
|
||||||
|
|
||||||
getLogger().info("Successfully put {} to Amazon S3 in {} milliseconds", new Object[] {ff, millis});
|
getLogger().info("Successfully put {} to Amazon S3 in {} milliseconds", new Object[] {ff, millis});
|
||||||
|
try {
|
||||||
|
removeLocalState(cacheKey);
|
||||||
|
} catch (IOException e) {
|
||||||
|
getLogger().info("Error trying to delete key {} from cache: {}",
|
||||||
|
new Object[]{cacheKey, e.getMessage()});
|
||||||
|
}
|
||||||
} catch (final ProcessException | AmazonClientException pe) {
|
} catch (final ProcessException | AmazonClientException pe) {
|
||||||
getLogger().error("Failed to put {} to Amazon S3 due to {}", new Object[] {flowFile, pe});
|
if (pe.getMessage().contains(S3_PROCESS_UNSCHEDULED_MESSAGE)) {
|
||||||
flowFile = session.penalize(flowFile);
|
getLogger().info(pe.getMessage());
|
||||||
session.transfer(flowFile, REL_FAILURE);
|
session.rollback();
|
||||||
|
} else {
|
||||||
|
getLogger().error("Failed to put {} to Amazon S3 due to {}", new Object[]{flowFile, pe});
|
||||||
|
flowFile = session.penalize(flowFile);
|
||||||
|
session.transfer(flowFile, REL_FAILURE);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private final Lock s3BucketLock = new ReentrantLock();
|
||||||
|
private final AtomicLong lastS3AgeOff = new AtomicLong(0L);
|
||||||
|
private final DateFormat logFormat = new SimpleDateFormat();
|
||||||
|
|
||||||
|
protected void ageoffS3Uploads(final ProcessContext context, final AmazonS3Client s3, final long now) {
|
||||||
|
MultipartUploadListing oldUploads = getS3AgeoffListAndAgeoffLocalState(context, s3, now);
|
||||||
|
for (MultipartUpload upload : oldUploads.getMultipartUploads()) {
|
||||||
|
abortS3MultipartUpload(s3, oldUploads.getBucketName(), upload);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected MultipartUploadListing getS3AgeoffListAndAgeoffLocalState(final ProcessContext context, final AmazonS3Client s3, final long now) {
|
||||||
|
final long ageoff_interval = context.getProperty(MULTIPART_S3_AGEOFF_INTERVAL).asTimePeriod(TimeUnit.MILLISECONDS);
|
||||||
|
final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions().getValue();
|
||||||
|
final Long maxAge = context.getProperty(MULTIPART_S3_MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
|
||||||
|
final long ageCutoff = now - maxAge;
|
||||||
|
|
||||||
|
final List<MultipartUpload> ageoffList = new ArrayList<>();
|
||||||
|
if ((lastS3AgeOff.get() < now - ageoff_interval) && s3BucketLock.tryLock()) {
|
||||||
|
try {
|
||||||
|
|
||||||
|
ListMultipartUploadsRequest listRequest = new ListMultipartUploadsRequest(bucket);
|
||||||
|
MultipartUploadListing listing = s3.listMultipartUploads(listRequest);
|
||||||
|
for (MultipartUpload upload : listing.getMultipartUploads()) {
|
||||||
|
long uploadTime = upload.getInitiated().getTime();
|
||||||
|
if (uploadTime < ageCutoff) {
|
||||||
|
ageoffList.add(upload);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ageoff any local state
|
||||||
|
ageoffLocalState(ageCutoff);
|
||||||
|
lastS3AgeOff.set(System.currentTimeMillis());
|
||||||
|
} catch(AmazonClientException e) {
|
||||||
|
getLogger().error("Error checking S3 Multipart Upload list for {}: {}",
|
||||||
|
new Object[]{bucket, e.getMessage()});
|
||||||
|
} finally {
|
||||||
|
s3BucketLock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
MultipartUploadListing result = new MultipartUploadListing();
|
||||||
|
result.setBucketName(bucket);
|
||||||
|
result.setMultipartUploads(ageoffList);
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void abortS3MultipartUpload(final AmazonS3Client s3, final String bucket, final MultipartUpload upload) {
|
||||||
|
final String uploadKey = upload.getKey();
|
||||||
|
final String uploadId = upload.getUploadId();
|
||||||
|
final AbortMultipartUploadRequest abortRequest = new AbortMultipartUploadRequest(
|
||||||
|
bucket, uploadKey, uploadId);
|
||||||
|
try {
|
||||||
|
s3.abortMultipartUpload(abortRequest);
|
||||||
|
getLogger().info("Aborting out of date multipart upload, bucket {} key {} ID {}, initiated {}",
|
||||||
|
new Object[]{bucket, uploadKey, uploadId, logFormat.format(upload.getInitiated())});
|
||||||
|
} catch (AmazonClientException ace) {
|
||||||
|
getLogger().info("Error trying to abort multipart upload from bucket {} with key {} and ID {}: {}",
|
||||||
|
new Object[]{bucket, uploadKey, uploadId, ace.getMessage()});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static class MultipartState implements Serializable {
|
||||||
|
|
||||||
|
private static final long serialVersionUID = 9006072180563519740L;
|
||||||
|
|
||||||
|
private static final String SEPARATOR = "#";
|
||||||
|
|
||||||
|
private String _uploadId;
|
||||||
|
private Long _filePosition;
|
||||||
|
private List<PartETag> _partETags;
|
||||||
|
private Long _partSize;
|
||||||
|
private StorageClass _storageClass;
|
||||||
|
private Long _contentLength;
|
||||||
|
private Long _timestamp;
|
||||||
|
|
||||||
|
public MultipartState() {
|
||||||
|
_uploadId = "";
|
||||||
|
_filePosition = 0L;
|
||||||
|
_partETags = new ArrayList<>();
|
||||||
|
_partSize = 0L;
|
||||||
|
_storageClass = StorageClass.Standard;
|
||||||
|
_contentLength = 0L;
|
||||||
|
_timestamp = System.currentTimeMillis();
|
||||||
|
}
|
||||||
|
|
||||||
|
// create from a previous toString() result
|
||||||
|
public MultipartState(String buf) {
|
||||||
|
String[] fields = buf.split(SEPARATOR);
|
||||||
|
_uploadId = fields[0];
|
||||||
|
_filePosition = Long.parseLong(fields[1]);
|
||||||
|
_partETags = new ArrayList<>();
|
||||||
|
for (String part : fields[2].split(",")) {
|
||||||
|
if (part != null && !part.isEmpty()) {
|
||||||
|
String[] partFields = part.split("/");
|
||||||
|
_partETags.add(new PartETag(Integer.parseInt(partFields[0]), partFields[1]));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_partSize = Long.parseLong(fields[3]);
|
||||||
|
_storageClass = StorageClass.fromValue(fields[4]);
|
||||||
|
_contentLength = Long.parseLong(fields[5]);
|
||||||
|
_timestamp = Long.parseLong(fields[6]);
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getUploadId() {
|
||||||
|
return _uploadId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setUploadId(String id) {
|
||||||
|
_uploadId = id;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Long getFilePosition() {
|
||||||
|
return _filePosition;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setFilePosition(Long pos) {
|
||||||
|
_filePosition = pos;
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<PartETag> getPartETags() {
|
||||||
|
return _partETags;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addPartETag(PartETag tag) {
|
||||||
|
_partETags.add(tag);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Long getPartSize() {
|
||||||
|
return _partSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setPartSize(Long size) {
|
||||||
|
_partSize = size;
|
||||||
|
}
|
||||||
|
|
||||||
|
public StorageClass getStorageClass() {
|
||||||
|
return _storageClass;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setStorageClass(StorageClass aClass) {
|
||||||
|
_storageClass = aClass;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Long getContentLength() {
|
||||||
|
return _contentLength;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setContentLength(Long length) {
|
||||||
|
_contentLength = length;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Long getTimestamp() {
|
||||||
|
return _timestamp;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setTimestamp(Long timestamp) {
|
||||||
|
_timestamp = timestamp;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String toString() {
|
||||||
|
StringBuilder buf = new StringBuilder();
|
||||||
|
buf.append(_uploadId).append(SEPARATOR)
|
||||||
|
.append(_filePosition.toString()).append(SEPARATOR);
|
||||||
|
if (_partETags.size() > 0) {
|
||||||
|
boolean first = true;
|
||||||
|
for (PartETag tag : _partETags) {
|
||||||
|
if (!first) {
|
||||||
|
buf.append(",");
|
||||||
|
} else {
|
||||||
|
first = false;
|
||||||
|
}
|
||||||
|
buf.append(String.format("%d/%s", tag.getPartNumber(), tag.getETag()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
buf.append(SEPARATOR)
|
||||||
|
.append(_partSize.toString()).append(SEPARATOR)
|
||||||
|
.append(_storageClass.toString()).append(SEPARATOR)
|
||||||
|
.append(_contentLength.toString()).append(SEPARATOR)
|
||||||
|
.append(_timestamp.toString());
|
||||||
|
return buf.toString();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -37,7 +37,6 @@ import java.net.URI;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
import java.nio.file.Paths;
|
import java.nio.file.Paths;
|
||||||
import java.util.Iterator;
|
|
||||||
|
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
|
@ -50,9 +49,14 @@ import static org.junit.Assert.fail;
|
||||||
*/
|
*/
|
||||||
public abstract class AbstractS3IT {
|
public abstract class AbstractS3IT {
|
||||||
protected final static String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties";
|
protected final static String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties";
|
||||||
protected final static String BUCKET_NAME = "test-bucket-00000000-0000-0000-0000-123456789021";
|
|
||||||
protected final static String SAMPLE_FILE_RESOURCE_NAME = "/hello.txt";
|
protected final static String SAMPLE_FILE_RESOURCE_NAME = "/hello.txt";
|
||||||
protected final static String REGION = "eu-west-1";
|
protected final static String REGION = "us-west-1";
|
||||||
|
// Adding REGION to bucket prevents errors of
|
||||||
|
// "A conflicting conditional operation is currently in progress against this resource."
|
||||||
|
// when bucket is rapidly added/deleted and consistency propogation causes this error.
|
||||||
|
// (Should not be necessary if REGION remains static, but added to prevent future frustration.)
|
||||||
|
// [see http://stackoverflow.com/questions/13898057/aws-error-message-a-conflicting-conditional-operation-is-currently-in-progress]
|
||||||
|
protected final static String BUCKET_NAME = "test-bucket-00000000-0000-0000-0000-123456789021-" + REGION;
|
||||||
|
|
||||||
// Static so multiple Tests can use same client
|
// Static so multiple Tests can use same client
|
||||||
protected static AmazonS3Client client;
|
protected static AmazonS3Client client;
|
||||||
|
@ -99,8 +103,7 @@ public abstract class AbstractS3IT {
|
||||||
ObjectListing objectListing = client.listObjects(BUCKET_NAME);
|
ObjectListing objectListing = client.listObjects(BUCKET_NAME);
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
for (Iterator<?> iterator = objectListing.getObjectSummaries().iterator(); iterator.hasNext(); ) {
|
for (S3ObjectSummary objectSummary : objectListing.getObjectSummaries()) {
|
||||||
S3ObjectSummary objectSummary = (S3ObjectSummary) iterator.next();
|
|
||||||
client.deleteObject(BUCKET_NAME, objectSummary.getKey());
|
client.deleteObject(BUCKET_NAME, objectSummary.getKey());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -16,29 +16,63 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.nifi.processors.aws.s3;
|
package org.apache.nifi.processors.aws.s3;
|
||||||
|
|
||||||
import com.amazonaws.services.s3.model.StorageClass;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.io.FileInputStream;
|
||||||
|
import java.io.FileOutputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.file.Files;
|
||||||
|
import java.nio.file.Path;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
|
import com.amazonaws.AmazonServiceException;
|
||||||
|
import com.amazonaws.services.s3.model.ListMultipartUploadsRequest;
|
||||||
import org.apache.nifi.components.PropertyDescriptor;
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
|
import org.apache.nifi.flowfile.FlowFile;
|
||||||
|
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||||
|
import org.apache.nifi.processor.DataUnit;
|
||||||
|
import org.apache.nifi.processor.ProcessContext;
|
||||||
import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
|
import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
|
||||||
import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService;
|
import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService;
|
||||||
|
import org.apache.nifi.provenance.ProvenanceEventRecord;
|
||||||
|
import org.apache.nifi.provenance.ProvenanceEventType;
|
||||||
|
import org.apache.nifi.reporting.InitializationException;
|
||||||
import org.apache.nifi.util.MockFlowFile;
|
import org.apache.nifi.util.MockFlowFile;
|
||||||
import org.apache.nifi.util.TestRunner;
|
import org.apache.nifi.util.TestRunner;
|
||||||
import org.apache.nifi.util.TestRunners;
|
import org.apache.nifi.util.TestRunners;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
import org.junit.Ignore;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.io.IOException;
|
import com.amazonaws.AmazonClientException;
|
||||||
import java.util.HashMap;
|
import com.amazonaws.services.s3.AmazonS3Client;
|
||||||
import java.util.List;
|
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
|
||||||
import java.util.Map;
|
import com.amazonaws.services.s3.model.MultipartUpload;
|
||||||
|
import com.amazonaws.services.s3.model.MultipartUploadListing;
|
||||||
import static org.junit.Assert.assertEquals;
|
import com.amazonaws.services.s3.model.PartETag;
|
||||||
import static org.junit.Assert.assertTrue;
|
import com.amazonaws.services.s3.model.Region;
|
||||||
|
import com.amazonaws.services.s3.model.StorageClass;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Provides integration level testing with actual AWS S3 resources for {@link PutS3Object} and requires additional configuration and resources to work.
|
* Provides integration level testing with actual AWS S3 resources for {@link PutS3Object} and requires additional configuration and resources to work.
|
||||||
*/
|
*/
|
||||||
public class ITPutS3Object extends AbstractS3IT {
|
public class ITPutS3Object extends AbstractS3IT {
|
||||||
|
|
||||||
|
final static String TEST_ENDPOINT = "https://endpoint.com";
|
||||||
|
// final static String TEST_TRANSIT_URI = "https://" + BUCKET_NAME + ".endpoint.com";
|
||||||
|
final static String TEST_PARTSIZE_STRING = "50 mb";
|
||||||
|
final static Long TEST_PARTSIZE_LONG = 50L * 1024L * 1024L;
|
||||||
|
|
||||||
|
final static Long S3_MINIMUM_PART_SIZE = 50L * 1024L * 1024L;
|
||||||
|
final static Long S3_MAXIMUM_OBJECT_SIZE = 5L * 1024L * 1024L * 1024L;
|
||||||
|
|
||||||
|
final static Pattern reS3ETag = Pattern.compile("[0-9a-fA-f]{32,32}(-[0-9]+)?");
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSimplePut() throws IOException {
|
public void testSimplePut() throws IOException {
|
||||||
final TestRunner runner = TestRunners.newTestRunner(new PutS3Object());
|
final TestRunner runner = TestRunners.newTestRunner(new PutS3Object());
|
||||||
|
@ -119,12 +153,12 @@ public class ITPutS3Object extends AbstractS3IT {
|
||||||
@Test
|
@Test
|
||||||
public void testPutInFolder() throws IOException {
|
public void testPutInFolder() throws IOException {
|
||||||
final TestRunner runner = TestRunners.newTestRunner(new PutS3Object());
|
final TestRunner runner = TestRunners.newTestRunner(new PutS3Object());
|
||||||
|
|
||||||
runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
|
runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
|
||||||
runner.setProperty(PutS3Object.REGION, REGION);
|
runner.setProperty(PutS3Object.REGION, REGION);
|
||||||
runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
|
runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
|
||||||
|
|
||||||
Assert.assertTrue(runner.setProperty("x-custom-prop", "hello").isValid());
|
Assert.assertTrue(runner.setProperty("x-custom-prop", "hello").isValid());
|
||||||
|
runner.assertValid();
|
||||||
|
|
||||||
final Map<String, String> attrs = new HashMap<>();
|
final Map<String, String> attrs = new HashMap<>();
|
||||||
attrs.put("filename", "folder/1.txt");
|
attrs.put("filename", "folder/1.txt");
|
||||||
|
@ -144,15 +178,30 @@ public class ITPutS3Object extends AbstractS3IT {
|
||||||
runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
|
runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
|
||||||
runner.setProperty(PutS3Object.STORAGE_CLASS, StorageClass.ReducedRedundancy.name());
|
runner.setProperty(PutS3Object.STORAGE_CLASS, StorageClass.ReducedRedundancy.name());
|
||||||
|
|
||||||
|
int bytesNeeded = 55 * 1024 * 1024;
|
||||||
|
StringBuilder bldr = new StringBuilder(bytesNeeded + 1000);
|
||||||
|
for (int line = 0; line < 55; line++) {
|
||||||
|
bldr.append(String.format("line %06d This is sixty-three characters plus the EOL marker!\n", line));
|
||||||
|
}
|
||||||
|
String data55mb = bldr.toString();
|
||||||
|
|
||||||
Assert.assertTrue(runner.setProperty("x-custom-prop", "hello").isValid());
|
Assert.assertTrue(runner.setProperty("x-custom-prop", "hello").isValid());
|
||||||
|
|
||||||
final Map<String, String> attrs = new HashMap<>();
|
final Map<String, String> attrs = new HashMap<>();
|
||||||
attrs.put("filename", "folder/2.txt");
|
attrs.put("filename", "folder/2.txt");
|
||||||
runner.enqueue(getResourcePath(SAMPLE_FILE_RESOURCE_NAME), attrs);
|
runner.enqueue(getResourcePath(SAMPLE_FILE_RESOURCE_NAME), attrs);
|
||||||
|
attrs.put("filename", "folder/3.txt");
|
||||||
|
runner.enqueue(data55mb.getBytes(), attrs);
|
||||||
|
|
||||||
runner.run();
|
runner.run(2);
|
||||||
|
|
||||||
runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 1);
|
runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 2);
|
||||||
|
FlowFile file1 = runner.getFlowFilesForRelationship(PutS3Object.REL_SUCCESS).get(0);
|
||||||
|
Assert.assertEquals(StorageClass.ReducedRedundancy.toString(),
|
||||||
|
file1.getAttribute(PutS3Object.S3_STORAGECLASS_ATTR_KEY));
|
||||||
|
FlowFile file2 = runner.getFlowFilesForRelationship(PutS3Object.REL_SUCCESS).get(1);
|
||||||
|
Assert.assertEquals(StorageClass.ReducedRedundancy.toString(),
|
||||||
|
file2.getAttribute(PutS3Object.S3_STORAGECLASS_ATTR_KEY));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -178,7 +227,7 @@ public class ITPutS3Object extends AbstractS3IT {
|
||||||
public void testGetPropertyDescriptors() throws Exception {
|
public void testGetPropertyDescriptors() throws Exception {
|
||||||
PutS3Object processor = new PutS3Object();
|
PutS3Object processor = new PutS3Object();
|
||||||
List<PropertyDescriptor> pd = processor.getSupportedPropertyDescriptors();
|
List<PropertyDescriptor> pd = processor.getSupportedPropertyDescriptors();
|
||||||
assertEquals("size should be eq", 18, pd.size());
|
assertEquals("size should be eq", 22, pd.size());
|
||||||
assertTrue(pd.contains(PutS3Object.ACCESS_KEY));
|
assertTrue(pd.contains(PutS3Object.ACCESS_KEY));
|
||||||
assertTrue(pd.contains(PutS3Object.AWS_CREDENTIALS_PROVIDER_SERVICE));
|
assertTrue(pd.contains(PutS3Object.AWS_CREDENTIALS_PROVIDER_SERVICE));
|
||||||
assertTrue(pd.contains(PutS3Object.BUCKET));
|
assertTrue(pd.contains(PutS3Object.BUCKET));
|
||||||
|
@ -198,4 +247,536 @@ public class ITPutS3Object extends AbstractS3IT {
|
||||||
assertTrue(pd.contains(PutS3Object.WRITE_ACL_LIST));
|
assertTrue(pd.contains(PutS3Object.WRITE_ACL_LIST));
|
||||||
assertTrue(pd.contains(PutS3Object.WRITE_USER_LIST));
|
assertTrue(pd.contains(PutS3Object.WRITE_USER_LIST));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDynamicProperty() throws IOException {
|
||||||
|
final String DYNAMIC_ATTRIB_KEY = "fs.runTimestamp";
|
||||||
|
final String DYNAMIC_ATTRIB_VALUE = "${now():toNumber()}";
|
||||||
|
|
||||||
|
final PutS3Object processor = new PutS3Object();
|
||||||
|
final TestRunner runner = TestRunners.newTestRunner(processor);
|
||||||
|
|
||||||
|
runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
|
||||||
|
runner.setProperty(PutS3Object.REGION, REGION);
|
||||||
|
runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
|
||||||
|
runner.setProperty(PutS3Object.MULTIPART_PART_SIZE, TEST_PARTSIZE_STRING);
|
||||||
|
PropertyDescriptor testAttrib = processor.getSupportedDynamicPropertyDescriptor(DYNAMIC_ATTRIB_KEY);
|
||||||
|
runner.setProperty(testAttrib, DYNAMIC_ATTRIB_VALUE);
|
||||||
|
|
||||||
|
final String FILE1_NAME = "file1";
|
||||||
|
Map<String, String> attribs = new HashMap<>();
|
||||||
|
attribs.put(CoreAttributes.FILENAME.key(), FILE1_NAME);
|
||||||
|
runner.enqueue("123".getBytes(), attribs);
|
||||||
|
|
||||||
|
runner.assertValid();
|
||||||
|
processor.getPropertyDescriptor(DYNAMIC_ATTRIB_KEY);
|
||||||
|
runner.run();
|
||||||
|
runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS);
|
||||||
|
final List<MockFlowFile> successFiles = runner.getFlowFilesForRelationship(PutS3Object.REL_SUCCESS);
|
||||||
|
Assert.assertEquals(1, successFiles.size());
|
||||||
|
MockFlowFile ff1 = successFiles.get(0);
|
||||||
|
|
||||||
|
Long now = System.currentTimeMillis();
|
||||||
|
String millisNow = Long.toString(now);
|
||||||
|
String millisOneSecAgo = Long.toString(now - 1000L);
|
||||||
|
String usermeta = ff1.getAttribute(PutS3Object.S3_USERMETA_ATTR_KEY);
|
||||||
|
String[] usermetaLine0 = usermeta.split(System.lineSeparator())[0].split("=");
|
||||||
|
String usermetaKey0 = usermetaLine0[0];
|
||||||
|
String usermetaValue0 = usermetaLine0[1];
|
||||||
|
Assert.assertEquals(DYNAMIC_ATTRIB_KEY, usermetaKey0);
|
||||||
|
Assert.assertTrue(usermetaValue0.compareTo(millisOneSecAgo) >=0 && usermetaValue0.compareTo(millisNow) <= 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testProvenance() throws InitializationException {
|
||||||
|
final String PROV1_FILE = "provfile1";
|
||||||
|
|
||||||
|
final PutS3Object processor = new PutS3Object();
|
||||||
|
final TestRunner runner = TestRunners.newTestRunner(processor);
|
||||||
|
|
||||||
|
runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
|
||||||
|
runner.setProperty(PutS3Object.REGION, REGION);
|
||||||
|
runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
|
||||||
|
runner.setProperty(PutS3Object.KEY, "${filename}");
|
||||||
|
|
||||||
|
Map<String, String> attribs = new HashMap<>();
|
||||||
|
attribs.put(CoreAttributes.FILENAME.key(), PROV1_FILE);
|
||||||
|
runner.enqueue("prov1 contents".getBytes(), attribs);
|
||||||
|
|
||||||
|
runner.assertValid();
|
||||||
|
runner.run();
|
||||||
|
runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS);
|
||||||
|
final List<MockFlowFile> successFiles = runner.getFlowFilesForRelationship(PutS3Object.REL_SUCCESS);
|
||||||
|
Assert.assertEquals(1, successFiles.size());
|
||||||
|
|
||||||
|
final List<ProvenanceEventRecord> provenanceEvents = runner.getProvenanceEvents();
|
||||||
|
Assert.assertEquals(1, provenanceEvents.size());
|
||||||
|
ProvenanceEventRecord provRec1 = provenanceEvents.get(0);
|
||||||
|
Assert.assertEquals(ProvenanceEventType.SEND, provRec1.getEventType());
|
||||||
|
Assert.assertEquals(processor.getIdentifier(), provRec1.getComponentId());
|
||||||
|
client.setRegion(Region.fromValue(REGION).toAWSRegion());
|
||||||
|
String targetUri = client.getUrl(BUCKET_NAME, PROV1_FILE).toString();
|
||||||
|
Assert.assertEquals(targetUri, provRec1.getTransitUri());
|
||||||
|
Assert.assertEquals(7, provRec1.getUpdatedAttributes().size());
|
||||||
|
Assert.assertEquals(BUCKET_NAME, provRec1.getUpdatedAttributes().get(PutS3Object.S3_BUCKET_KEY));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testStateDefaults() {
|
||||||
|
PutS3Object.MultipartState state1 = new PutS3Object.MultipartState();
|
||||||
|
Assert.assertEquals(state1.getUploadId(), "");
|
||||||
|
Assert.assertEquals(state1.getFilePosition(), (Long) 0L);
|
||||||
|
Assert.assertEquals(state1.getPartETags().size(), 0L);
|
||||||
|
Assert.assertEquals(state1.getPartSize(), (Long) 0L);
|
||||||
|
Assert.assertEquals(state1.getStorageClass().toString(), StorageClass.Standard.toString());
|
||||||
|
Assert.assertEquals(state1.getContentLength(), (Long) 0L);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testStateToString() throws IOException, InitializationException {
|
||||||
|
final String target = "UID-test1234567890#10001#1/PartETag-1,2/PartETag-2,3/PartETag-3,4/PartETag-4#20002#REDUCED_REDUNDANCY#30003#8675309";
|
||||||
|
PutS3Object.MultipartState state2 = new PutS3Object.MultipartState();
|
||||||
|
state2.setUploadId("UID-test1234567890");
|
||||||
|
state2.setFilePosition(10001L);
|
||||||
|
state2.setTimestamp(8675309L);
|
||||||
|
for (Integer partNum = 1; partNum < 5; partNum++) {
|
||||||
|
state2.addPartETag(new PartETag(partNum, "PartETag-" + partNum.toString()));
|
||||||
|
}
|
||||||
|
state2.setPartSize(20002L);
|
||||||
|
state2.setStorageClass(StorageClass.ReducedRedundancy);
|
||||||
|
state2.setContentLength(30003L);
|
||||||
|
Assert.assertEquals(target, state2.toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testEndpointOverride() {
|
||||||
|
// remove leading "/" from filename to avoid duplicate separators
|
||||||
|
final String TESTKEY = AbstractS3IT.SAMPLE_FILE_RESOURCE_NAME.substring(1);
|
||||||
|
|
||||||
|
final PutS3Object processor = new TestablePutS3Object();
|
||||||
|
final TestRunner runner = TestRunners.newTestRunner(processor);
|
||||||
|
final ProcessContext context = runner.getProcessContext();
|
||||||
|
|
||||||
|
runner.setProperty(PutS3Object.ENDPOINT_OVERRIDE, TEST_ENDPOINT);
|
||||||
|
runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
|
||||||
|
runner.setProperty(PutS3Object.KEY, TESTKEY);
|
||||||
|
|
||||||
|
runner.run();
|
||||||
|
|
||||||
|
Assert.assertEquals(BUCKET_NAME, context.getProperty(PutS3Object.BUCKET).toString());
|
||||||
|
Assert.assertEquals(TESTKEY, context.getProperty(PutS3Object.KEY).evaluateAttributeExpressions().toString());
|
||||||
|
Assert.assertEquals(TEST_ENDPOINT, context.getProperty(PutS3Object.ENDPOINT_OVERRIDE).toString());
|
||||||
|
|
||||||
|
String s3url = ((TestablePutS3Object)processor).testable_getClient().getResourceUrl(BUCKET_NAME, TESTKEY);
|
||||||
|
Assert.assertEquals(TEST_ENDPOINT + "/" + BUCKET_NAME + "/" + TESTKEY, s3url);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMultipartProperties() throws IOException {
|
||||||
|
final PutS3Object processor = new PutS3Object();
|
||||||
|
final TestRunner runner = TestRunners.newTestRunner(processor);
|
||||||
|
final ProcessContext context = runner.getProcessContext();
|
||||||
|
|
||||||
|
runner.setProperty(PutS3Object.FULL_CONTROL_USER_LIST,
|
||||||
|
"28545acd76c35c7e91f8409b95fd1aa0c0914bfa1ac60975d9f48bc3c5e090b5");
|
||||||
|
runner.setProperty(PutS3Object.REGION, REGION);
|
||||||
|
runner.setProperty(PutS3Object.MULTIPART_PART_SIZE, TEST_PARTSIZE_STRING);
|
||||||
|
runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
|
||||||
|
runner.setProperty(PutS3Object.KEY, AbstractS3IT.SAMPLE_FILE_RESOURCE_NAME);
|
||||||
|
|
||||||
|
Assert.assertEquals(BUCKET_NAME, context.getProperty(PutS3Object.BUCKET).toString());
|
||||||
|
Assert.assertEquals(SAMPLE_FILE_RESOURCE_NAME, context.getProperty(PutS3Object.KEY).evaluateAttributeExpressions().toString());
|
||||||
|
Assert.assertEquals(TEST_PARTSIZE_LONG.longValue(),
|
||||||
|
context.getProperty(PutS3Object.MULTIPART_PART_SIZE).asDataSize(DataUnit.B).longValue());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testLocalStatePersistence() throws IOException {
|
||||||
|
final PutS3Object processor = new PutS3Object();
|
||||||
|
final TestRunner runner = TestRunners.newTestRunner(processor);
|
||||||
|
|
||||||
|
final String bucket = runner.getProcessContext().getProperty(PutS3Object.BUCKET).getValue();
|
||||||
|
final String key = runner.getProcessContext().getProperty(PutS3Object.KEY).getValue();
|
||||||
|
final String cacheKey1 = runner.getProcessor().getIdentifier() + "/" + bucket + "/" + key;
|
||||||
|
final String cacheKey2 = runner.getProcessor().getIdentifier() + "/" + bucket + "/" + key + "-v2";
|
||||||
|
final String cacheKey3 = runner.getProcessor().getIdentifier() + "/" + bucket + "/" + key + "-v3";
|
||||||
|
|
||||||
|
/*
|
||||||
|
* store 3 versions of state
|
||||||
|
*/
|
||||||
|
PutS3Object.MultipartState state1orig = new PutS3Object.MultipartState();
|
||||||
|
processor.persistLocalState(cacheKey1, state1orig);
|
||||||
|
|
||||||
|
PutS3Object.MultipartState state2orig = new PutS3Object.MultipartState();
|
||||||
|
state2orig.setUploadId("1234");
|
||||||
|
state2orig.setContentLength(1234L);
|
||||||
|
processor.persistLocalState(cacheKey2, state2orig);
|
||||||
|
|
||||||
|
PutS3Object.MultipartState state3orig = new PutS3Object.MultipartState();
|
||||||
|
state3orig.setUploadId("5678");
|
||||||
|
state3orig.setContentLength(5678L);
|
||||||
|
processor.persistLocalState(cacheKey3, state3orig);
|
||||||
|
|
||||||
|
final List<MultipartUpload> uploadList = new ArrayList<>();
|
||||||
|
final MultipartUpload upload1 = new MultipartUpload();
|
||||||
|
upload1.setKey(key);
|
||||||
|
upload1.setUploadId("");
|
||||||
|
uploadList.add(upload1);
|
||||||
|
final MultipartUpload upload2 = new MultipartUpload();
|
||||||
|
upload2.setKey(key + "-v2");
|
||||||
|
upload2.setUploadId("1234");
|
||||||
|
uploadList.add(upload2);
|
||||||
|
final MultipartUpload upload3 = new MultipartUpload();
|
||||||
|
upload3.setKey(key + "-v3");
|
||||||
|
upload3.setUploadId("5678");
|
||||||
|
uploadList.add(upload3);
|
||||||
|
final MultipartUploadListing uploadListing = new MultipartUploadListing();
|
||||||
|
uploadListing.setMultipartUploads(uploadList);
|
||||||
|
final MockAmazonS3Client mockClient = new MockAmazonS3Client();
|
||||||
|
mockClient.setListing(uploadListing);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* reload and validate stored state
|
||||||
|
*/
|
||||||
|
final PutS3Object.MultipartState state1new = processor.getLocalStateIfInS3(mockClient, bucket, cacheKey1);
|
||||||
|
Assert.assertEquals("", state1new.getUploadId());
|
||||||
|
Assert.assertEquals(0L, state1new.getFilePosition().longValue());
|
||||||
|
Assert.assertEquals(new ArrayList<PartETag>(), state1new.getPartETags());
|
||||||
|
Assert.assertEquals(0L, state1new.getPartSize().longValue());
|
||||||
|
Assert.assertEquals(StorageClass.fromValue(StorageClass.Standard.toString()), state1new.getStorageClass());
|
||||||
|
Assert.assertEquals(0L, state1new.getContentLength().longValue());
|
||||||
|
|
||||||
|
final PutS3Object.MultipartState state2new = processor.getLocalStateIfInS3(mockClient, bucket, cacheKey2);
|
||||||
|
Assert.assertEquals("1234", state2new.getUploadId());
|
||||||
|
Assert.assertEquals(0L, state2new.getFilePosition().longValue());
|
||||||
|
Assert.assertEquals(new ArrayList<PartETag>(), state2new.getPartETags());
|
||||||
|
Assert.assertEquals(0L, state2new.getPartSize().longValue());
|
||||||
|
Assert.assertEquals(StorageClass.fromValue(StorageClass.Standard.toString()), state2new.getStorageClass());
|
||||||
|
Assert.assertEquals(1234L, state2new.getContentLength().longValue());
|
||||||
|
|
||||||
|
final PutS3Object.MultipartState state3new = processor.getLocalStateIfInS3(mockClient, bucket, cacheKey3);
|
||||||
|
Assert.assertEquals("5678", state3new.getUploadId());
|
||||||
|
Assert.assertEquals(0L, state3new.getFilePosition().longValue());
|
||||||
|
Assert.assertEquals(new ArrayList<PartETag>(), state3new.getPartETags());
|
||||||
|
Assert.assertEquals(0L, state3new.getPartSize().longValue());
|
||||||
|
Assert.assertEquals(StorageClass.fromValue(StorageClass.Standard.toString()), state3new.getStorageClass());
|
||||||
|
Assert.assertEquals(5678L, state3new.getContentLength().longValue());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testStatePersistsETags() throws IOException {
|
||||||
|
final PutS3Object processor = new PutS3Object();
|
||||||
|
final TestRunner runner = TestRunners.newTestRunner(processor);
|
||||||
|
|
||||||
|
final String bucket = runner.getProcessContext().getProperty(PutS3Object.BUCKET).getValue();
|
||||||
|
final String key = runner.getProcessContext().getProperty(PutS3Object.KEY).getValue();
|
||||||
|
final String cacheKey1 = runner.getProcessor().getIdentifier() + "/" + bucket + "/" + key + "-bv1";
|
||||||
|
final String cacheKey2 = runner.getProcessor().getIdentifier() + "/" + bucket + "/" + key + "-bv2";
|
||||||
|
final String cacheKey3 = runner.getProcessor().getIdentifier() + "/" + bucket + "/" + key + "-bv3";
|
||||||
|
|
||||||
|
/*
|
||||||
|
* store 3 versions of state
|
||||||
|
*/
|
||||||
|
PutS3Object.MultipartState state1orig = new PutS3Object.MultipartState();
|
||||||
|
processor.persistLocalState(cacheKey1, state1orig);
|
||||||
|
|
||||||
|
PutS3Object.MultipartState state2orig = new PutS3Object.MultipartState();
|
||||||
|
state2orig.setUploadId("1234");
|
||||||
|
state2orig.setContentLength(1234L);
|
||||||
|
processor.persistLocalState(cacheKey2, state2orig);
|
||||||
|
|
||||||
|
PutS3Object.MultipartState state3orig = new PutS3Object.MultipartState();
|
||||||
|
state3orig.setUploadId("5678");
|
||||||
|
state3orig.setContentLength(5678L);
|
||||||
|
processor.persistLocalState(cacheKey3, state3orig);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* persist state to caches so that
|
||||||
|
* 1. v2 has 2 and then 4 tags
|
||||||
|
* 2. v3 has 4 and then 2 tags
|
||||||
|
*/
|
||||||
|
state2orig.getPartETags().add(new PartETag(1, "state 2 tag one"));
|
||||||
|
state2orig.getPartETags().add(new PartETag(2, "state 2 tag two"));
|
||||||
|
processor.persistLocalState(cacheKey2, state2orig);
|
||||||
|
state2orig.getPartETags().add(new PartETag(3, "state 2 tag three"));
|
||||||
|
state2orig.getPartETags().add(new PartETag(4, "state 2 tag four"));
|
||||||
|
processor.persistLocalState(cacheKey2, state2orig);
|
||||||
|
|
||||||
|
state3orig.getPartETags().add(new PartETag(1, "state 3 tag one"));
|
||||||
|
state3orig.getPartETags().add(new PartETag(2, "state 3 tag two"));
|
||||||
|
state3orig.getPartETags().add(new PartETag(3, "state 3 tag three"));
|
||||||
|
state3orig.getPartETags().add(new PartETag(4, "state 3 tag four"));
|
||||||
|
processor.persistLocalState(cacheKey3, state3orig);
|
||||||
|
state3orig.getPartETags().remove(state3orig.getPartETags().size() - 1);
|
||||||
|
state3orig.getPartETags().remove(state3orig.getPartETags().size() - 1);
|
||||||
|
processor.persistLocalState(cacheKey3, state3orig);
|
||||||
|
|
||||||
|
final List<MultipartUpload> uploadList = new ArrayList<>();
|
||||||
|
final MultipartUpload upload1 = new MultipartUpload();
|
||||||
|
upload1.setKey(key + "-bv2");
|
||||||
|
upload1.setUploadId("1234");
|
||||||
|
uploadList.add(upload1);
|
||||||
|
final MultipartUpload upload2 = new MultipartUpload();
|
||||||
|
upload2.setKey(key + "-bv3");
|
||||||
|
upload2.setUploadId("5678");
|
||||||
|
uploadList.add(upload2);
|
||||||
|
final MultipartUploadListing uploadListing = new MultipartUploadListing();
|
||||||
|
uploadListing.setMultipartUploads(uploadList);
|
||||||
|
final MockAmazonS3Client mockClient = new MockAmazonS3Client();
|
||||||
|
mockClient.setListing(uploadListing);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* load state and validate that
|
||||||
|
* 1. v2 restore shows 4 tags
|
||||||
|
* 2. v3 restore shows 2 tags
|
||||||
|
*/
|
||||||
|
final PutS3Object.MultipartState state2new = processor.getLocalStateIfInS3(mockClient, bucket, cacheKey2);
|
||||||
|
Assert.assertEquals("1234", state2new.getUploadId());
|
||||||
|
Assert.assertEquals(4, state2new.getPartETags().size());
|
||||||
|
|
||||||
|
final PutS3Object.MultipartState state3new = processor.getLocalStateIfInS3(mockClient, bucket, cacheKey3);
|
||||||
|
Assert.assertEquals("5678", state3new.getUploadId());
|
||||||
|
Assert.assertEquals(2, state3new.getPartETags().size());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testStateRemove() throws IOException {
|
||||||
|
final PutS3Object processor = new PutS3Object();
|
||||||
|
final TestRunner runner = TestRunners.newTestRunner(processor);
|
||||||
|
|
||||||
|
final String bucket = runner.getProcessContext().getProperty(PutS3Object.BUCKET).getValue();
|
||||||
|
final String key = runner.getProcessContext().getProperty(PutS3Object.KEY).getValue();
|
||||||
|
final String cacheKey = runner.getProcessor().getIdentifier() + "/" + bucket + "/" + key + "-sr";
|
||||||
|
|
||||||
|
final List<MultipartUpload> uploadList = new ArrayList<>();
|
||||||
|
final MultipartUpload upload1 = new MultipartUpload();
|
||||||
|
upload1.setKey(key);
|
||||||
|
upload1.setUploadId("1234");
|
||||||
|
uploadList.add(upload1);
|
||||||
|
final MultipartUploadListing uploadListing = new MultipartUploadListing();
|
||||||
|
uploadListing.setMultipartUploads(uploadList);
|
||||||
|
final MockAmazonS3Client mockClient = new MockAmazonS3Client();
|
||||||
|
mockClient.setListing(uploadListing);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* store state, retrieve and validate, remove and validate
|
||||||
|
*/
|
||||||
|
PutS3Object.MultipartState stateOrig = new PutS3Object.MultipartState();
|
||||||
|
stateOrig.setUploadId("1234");
|
||||||
|
stateOrig.setContentLength(1234L);
|
||||||
|
processor.persistLocalState(cacheKey, stateOrig);
|
||||||
|
|
||||||
|
PutS3Object.MultipartState state1 = processor.getLocalStateIfInS3(mockClient, bucket, cacheKey);
|
||||||
|
Assert.assertEquals("1234", state1.getUploadId());
|
||||||
|
Assert.assertEquals(1234L, state1.getContentLength().longValue());
|
||||||
|
|
||||||
|
processor.persistLocalState(cacheKey, null);
|
||||||
|
PutS3Object.MultipartState state2 = processor.getLocalStateIfInS3(mockClient, bucket, cacheKey);
|
||||||
|
Assert.assertNull(state2);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMultipartSmallerThanMinimum() throws IOException {
|
||||||
|
final String FILE1_NAME = "file1";
|
||||||
|
|
||||||
|
final byte[] megabyte = new byte[1024 * 1024];
|
||||||
|
final Path tempFile = Files.createTempFile("s3mulitpart", "tmp");
|
||||||
|
final FileOutputStream tempOut = new FileOutputStream(tempFile.toFile());
|
||||||
|
long tempByteCount = 0;
|
||||||
|
for (int i = 0; i < 5; i++) {
|
||||||
|
tempOut.write(megabyte);
|
||||||
|
tempByteCount += megabyte.length;
|
||||||
|
}
|
||||||
|
tempOut.close();
|
||||||
|
System.out.println("file size: " + tempByteCount);
|
||||||
|
Assert.assertTrue(tempByteCount < S3_MINIMUM_PART_SIZE);
|
||||||
|
|
||||||
|
Assert.assertTrue(megabyte.length < S3_MINIMUM_PART_SIZE);
|
||||||
|
Assert.assertTrue(TEST_PARTSIZE_LONG >= S3_MINIMUM_PART_SIZE && TEST_PARTSIZE_LONG <= S3_MAXIMUM_OBJECT_SIZE);
|
||||||
|
|
||||||
|
final PutS3Object processor = new PutS3Object();
|
||||||
|
final TestRunner runner = TestRunners.newTestRunner(processor);
|
||||||
|
|
||||||
|
runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
|
||||||
|
runner.setProperty(PutS3Object.REGION, REGION);
|
||||||
|
runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
|
||||||
|
runner.setProperty(PutS3Object.MULTIPART_PART_SIZE, TEST_PARTSIZE_STRING);
|
||||||
|
|
||||||
|
Map<String, String> attributes = new HashMap<>();
|
||||||
|
attributes.put(CoreAttributes.FILENAME.key(), FILE1_NAME);
|
||||||
|
runner.enqueue(new FileInputStream(tempFile.toFile()), attributes);
|
||||||
|
|
||||||
|
runner.assertValid();
|
||||||
|
runner.run();
|
||||||
|
runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS);
|
||||||
|
final List<MockFlowFile> successFiles = runner.getFlowFilesForRelationship(PutS3Object.REL_SUCCESS);
|
||||||
|
Assert.assertEquals(1, successFiles.size());
|
||||||
|
final List<MockFlowFile> failureFiles = runner.getFlowFilesForRelationship(PutS3Object.REL_FAILURE);
|
||||||
|
Assert.assertEquals(0, failureFiles.size());
|
||||||
|
MockFlowFile ff1 = successFiles.get(0);
|
||||||
|
Assert.assertEquals(PutS3Object.S3_API_METHOD_PUTOBJECT, ff1.getAttribute(PutS3Object.S3_API_METHOD_ATTR_KEY));
|
||||||
|
Assert.assertEquals(FILE1_NAME, ff1.getAttribute(CoreAttributes.FILENAME.key()));
|
||||||
|
Assert.assertEquals(BUCKET_NAME, ff1.getAttribute(PutS3Object.S3_BUCKET_KEY));
|
||||||
|
Assert.assertEquals(FILE1_NAME, ff1.getAttribute(PutS3Object.S3_OBJECT_KEY));
|
||||||
|
Assert.assertTrue(reS3ETag.matcher(ff1.getAttribute(PutS3Object.S3_ETAG_ATTR_KEY)).matches());
|
||||||
|
Assert.assertEquals(tempByteCount, ff1.getSize());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMultipartBetweenMinimumAndMaximum() throws IOException {
|
||||||
|
final String FILE1_NAME = "file1";
|
||||||
|
|
||||||
|
final byte[] megabyte = new byte[1024 * 1024];
|
||||||
|
final Path tempFile = Files.createTempFile("s3mulitpart", "tmp");
|
||||||
|
final FileOutputStream tempOut = new FileOutputStream(tempFile.toFile());
|
||||||
|
long tempByteCount = 0;
|
||||||
|
for ( ; tempByteCount < TEST_PARTSIZE_LONG + 1; ) {
|
||||||
|
tempOut.write(megabyte);
|
||||||
|
tempByteCount += megabyte.length;
|
||||||
|
}
|
||||||
|
tempOut.close();
|
||||||
|
System.out.println("file size: " + tempByteCount);
|
||||||
|
Assert.assertTrue(tempByteCount > S3_MINIMUM_PART_SIZE && tempByteCount < S3_MAXIMUM_OBJECT_SIZE);
|
||||||
|
Assert.assertTrue(tempByteCount > TEST_PARTSIZE_LONG);
|
||||||
|
|
||||||
|
final PutS3Object processor = new PutS3Object();
|
||||||
|
final TestRunner runner = TestRunners.newTestRunner(processor);
|
||||||
|
|
||||||
|
runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
|
||||||
|
runner.setProperty(PutS3Object.REGION, REGION);
|
||||||
|
runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
|
||||||
|
runner.setProperty(PutS3Object.MULTIPART_THRESHOLD, TEST_PARTSIZE_STRING);
|
||||||
|
runner.setProperty(PutS3Object.MULTIPART_PART_SIZE, TEST_PARTSIZE_STRING);
|
||||||
|
|
||||||
|
Map<String, String> attributes = new HashMap<>();
|
||||||
|
attributes.put(CoreAttributes.FILENAME.key(), FILE1_NAME);
|
||||||
|
runner.enqueue(new FileInputStream(tempFile.toFile()), attributes);
|
||||||
|
|
||||||
|
runner.assertValid();
|
||||||
|
runner.run();
|
||||||
|
runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS);
|
||||||
|
final List<MockFlowFile> successFiles = runner.getFlowFilesForRelationship(PutS3Object.REL_SUCCESS);
|
||||||
|
Assert.assertEquals(1, successFiles.size());
|
||||||
|
final List<MockFlowFile> failureFiles = runner.getFlowFilesForRelationship(PutS3Object.REL_FAILURE);
|
||||||
|
Assert.assertEquals(0, failureFiles.size());
|
||||||
|
MockFlowFile ff1 = successFiles.get(0);
|
||||||
|
Assert.assertEquals(PutS3Object.S3_API_METHOD_MULTIPARTUPLOAD, ff1.getAttribute(PutS3Object.S3_API_METHOD_ATTR_KEY));
|
||||||
|
Assert.assertEquals(FILE1_NAME, ff1.getAttribute(CoreAttributes.FILENAME.key()));
|
||||||
|
Assert.assertEquals(BUCKET_NAME, ff1.getAttribute(PutS3Object.S3_BUCKET_KEY));
|
||||||
|
Assert.assertEquals(FILE1_NAME, ff1.getAttribute(PutS3Object.S3_OBJECT_KEY));
|
||||||
|
Assert.assertTrue(reS3ETag.matcher(ff1.getAttribute(PutS3Object.S3_ETAG_ATTR_KEY)).matches());
|
||||||
|
Assert.assertEquals(tempByteCount, ff1.getSize());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Ignore
|
||||||
|
@Test
|
||||||
|
public void testMultipartLargerThanObjectMaximum() throws IOException {
|
||||||
|
final String FILE1_NAME = "file1";
|
||||||
|
|
||||||
|
final byte[] megabyte = new byte[1024 * 1024];
|
||||||
|
final Path tempFile = Files.createTempFile("s3mulitpart", "tmp");
|
||||||
|
final FileOutputStream tempOut = new FileOutputStream(tempFile.toFile());
|
||||||
|
for (int i = 0; i < (S3_MAXIMUM_OBJECT_SIZE / 1024 / 1024 + 1); i++) {
|
||||||
|
tempOut.write(megabyte);
|
||||||
|
}
|
||||||
|
tempOut.close();
|
||||||
|
|
||||||
|
final PutS3Object processor = new PutS3Object();
|
||||||
|
final TestRunner runner = TestRunners.newTestRunner(processor);
|
||||||
|
|
||||||
|
runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
|
||||||
|
runner.setProperty(PutS3Object.REGION, REGION);
|
||||||
|
runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
|
||||||
|
runner.setProperty(PutS3Object.MULTIPART_PART_SIZE, TEST_PARTSIZE_STRING);
|
||||||
|
|
||||||
|
Map<String, String> attributes = new HashMap<>();
|
||||||
|
attributes.put(CoreAttributes.FILENAME.key(), FILE1_NAME);
|
||||||
|
runner.enqueue(new FileInputStream(tempFile.toFile()), attributes);
|
||||||
|
|
||||||
|
runner.assertValid();
|
||||||
|
runner.run();
|
||||||
|
runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS);
|
||||||
|
final List<MockFlowFile> successFiles = runner.getFlowFilesForRelationship(PutS3Object.REL_SUCCESS);
|
||||||
|
Assert.assertEquals(1, successFiles.size());
|
||||||
|
final List<MockFlowFile> failureFiles = runner.getFlowFilesForRelationship(PutS3Object.REL_FAILURE);
|
||||||
|
Assert.assertEquals(0, failureFiles.size());
|
||||||
|
MockFlowFile ff1 = successFiles.get(0);
|
||||||
|
Assert.assertEquals(FILE1_NAME, ff1.getAttribute(CoreAttributes.FILENAME.key()));
|
||||||
|
Assert.assertEquals(BUCKET_NAME, ff1.getAttribute(PutS3Object.S3_BUCKET_KEY));
|
||||||
|
Assert.assertEquals(FILE1_NAME, ff1.getAttribute(PutS3Object.S3_OBJECT_KEY));
|
||||||
|
Assert.assertTrue(reS3ETag.matcher(ff1.getAttribute(PutS3Object.S3_ETAG_ATTR_KEY)).matches());
|
||||||
|
Assert.assertTrue(ff1.getSize() > S3_MAXIMUM_OBJECT_SIZE);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testS3MultipartAgeoff() throws InterruptedException, IOException {
|
||||||
|
final PutS3Object processor = new PutS3Object();
|
||||||
|
final TestRunner runner = TestRunners.newTestRunner(processor);
|
||||||
|
final ProcessContext context = runner.getProcessContext();
|
||||||
|
|
||||||
|
runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
|
||||||
|
runner.setProperty(PutS3Object.REGION, REGION);
|
||||||
|
runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
|
||||||
|
|
||||||
|
// set check interval and age off to minimum values
|
||||||
|
runner.setProperty(PutS3Object.MULTIPART_S3_AGEOFF_INTERVAL, "1 milli");
|
||||||
|
runner.setProperty(PutS3Object.MULTIPART_S3_MAX_AGE, "1 milli");
|
||||||
|
|
||||||
|
// create some dummy uploads
|
||||||
|
for (Integer i = 0; i < 3; i++) {
|
||||||
|
final InitiateMultipartUploadRequest initiateRequest = new InitiateMultipartUploadRequest(
|
||||||
|
BUCKET_NAME, "file" + i.toString() + ".txt");
|
||||||
|
try {
|
||||||
|
client.initiateMultipartUpload(initiateRequest);
|
||||||
|
} catch (AmazonClientException e) {
|
||||||
|
Assert.fail("Failed to initiate upload: " + e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Age off is time dependent, so test has some timing constraints. This
|
||||||
|
// sleep() delays long enough to satisfy interval and age intervals.
|
||||||
|
Thread.sleep(2000L);
|
||||||
|
|
||||||
|
// System millis are used for timing, but it is incrememtned on each
|
||||||
|
// call to circumvent what appears to be caching in the AWS library.
|
||||||
|
// The increments are 1000 millis because AWS returns upload
|
||||||
|
// initiation times in whole seconds.
|
||||||
|
Long now = System.currentTimeMillis();
|
||||||
|
|
||||||
|
MultipartUploadListing uploadList = processor.getS3AgeoffListAndAgeoffLocalState(context, client, now);
|
||||||
|
Assert.assertEquals(3, uploadList.getMultipartUploads().size());
|
||||||
|
|
||||||
|
MultipartUpload upload0 = uploadList.getMultipartUploads().get(0);
|
||||||
|
processor.abortS3MultipartUpload(client, BUCKET_NAME, upload0);
|
||||||
|
|
||||||
|
uploadList = processor.getS3AgeoffListAndAgeoffLocalState(context, client, now+1000);
|
||||||
|
Assert.assertEquals(2, uploadList.getMultipartUploads().size());
|
||||||
|
|
||||||
|
final Map<String, String> attrs = new HashMap<>();
|
||||||
|
attrs.put("filename", "test-upload.txt");
|
||||||
|
runner.enqueue(getResourcePath(SAMPLE_FILE_RESOURCE_NAME), attrs);
|
||||||
|
runner.run();
|
||||||
|
|
||||||
|
uploadList = processor.getS3AgeoffListAndAgeoffLocalState(context, client, now+2000);
|
||||||
|
Assert.assertEquals(0, uploadList.getMultipartUploads().size());
|
||||||
|
}
|
||||||
|
|
||||||
|
private class MockAmazonS3Client extends AmazonS3Client {
|
||||||
|
MultipartUploadListing listing;
|
||||||
|
public void setListing(MultipartUploadListing newlisting) {
|
||||||
|
listing = newlisting;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public MultipartUploadListing listMultipartUploads(ListMultipartUploadsRequest listMultipartUploadsRequest)
|
||||||
|
throws AmazonClientException, AmazonServiceException {
|
||||||
|
return listing;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public class TestablePutS3Object extends PutS3Object {
|
||||||
|
public AmazonS3Client testable_getClient() {
|
||||||
|
return this.getClient();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
Loading…
Reference in New Issue