diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java index 3c35658a2c..0c033c9fca 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java @@ -17,18 +17,28 @@ package org.apache.nifi.processors.aws.s3; import java.io.BufferedInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; +import java.io.Serializable; +import java.nio.file.Files; +import java.text.DateFormat; +import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; -import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; @@ -42,6 +52,7 @@ import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.exception.ProcessException; @@ -50,31 +61,66 @@ import org.apache.nifi.processor.util.StandardValidators; import com.amazonaws.AmazonClientException; import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.AbortMultipartUploadRequest; import com.amazonaws.services.s3.model.AccessControlList; +import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest; +import com.amazonaws.services.s3.model.CompleteMultipartUploadResult; +import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; +import com.amazonaws.services.s3.model.InitiateMultipartUploadResult; +import com.amazonaws.services.s3.model.ListMultipartUploadsRequest; +import com.amazonaws.services.s3.model.MultipartUpload; +import com.amazonaws.services.s3.model.MultipartUploadListing; import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.model.PartETag; import com.amazonaws.services.s3.model.PutObjectRequest; import com.amazonaws.services.s3.model.PutObjectResult; import com.amazonaws.services.s3.model.StorageClass; +import com.amazonaws.services.s3.model.UploadPartRequest; +import com.amazonaws.services.s3.model.UploadPartResult; @SupportsBatching @SeeAlso({FetchS3Object.class, DeleteS3Object.class}) @InputRequirement(Requirement.INPUT_REQUIRED) @Tags({"Amazon", "S3", "AWS", "Archive", "Put"}) -@CapabilityDescription("Puts FlowFiles to an Amazon S3 Bucket") -@DynamicProperty(name = "The name of a User-Defined Metadata field to add to the S3 Object", value = "The value of a User-Defined Metadata field to add to the S3 Object", - description = "Allows user-defined metadata to be added to the S3 object as key/value pairs", supportsExpressionLanguage = true) +@CapabilityDescription("Puts FlowFiles to an Amazon S3 Bucket\n" + + "The upload uses either the PutS3Object method or PutS3MultipartUpload methods. The PutS3Object method " + + "send the file in a single synchronous call, but it has a 5GB size limit. Larger files are sent using the " + + "multipart upload methods that initiate, transfer the parts, and complete an upload. This multipart process " + + "saves state after each step so that a large upload can be resumed with minimal loss if the processor or " + + "cluster is stopped and restarted.\n" + + "A multipart upload consists of three steps\n" + + " 1) initiate upload,\n" + + " 2) upload the parts, and\n" + + " 3) complete the upload.\n" + + "For multipart uploads, the processor saves state locally tracking the upload ID and parts uploaded, which " + + "must both be provided to complete the upload.\n" + + "The AWS libraries select an endpoint URL based on the AWS region, but this can be overridden with the " + + "'Endpoint Override URL' property for use with other S3-compatible endpoints.\n" + + "The S3 API specifies that the maximum file size for a PutS3Object upload is 5GB. It also requires that " + + "parts in a multipart upload must be at least 5MB in size, except for the last part. These limits are " + + "establish the bounds for the Multipart Upload Threshold and Part Size properties.") +@DynamicProperty(name = "The name of a User-Defined Metadata field to add to the S3 Object", + value = "The value of a User-Defined Metadata field to add to the S3 Object", + description = "Allows user-defined metadata to be added to the S3 object as key/value pairs", + supportsExpressionLanguage = true) @ReadsAttribute(attribute = "filename", description = "Uses the FlowFile's filename as the filename for the S3 object") @WritesAttributes({ @WritesAttribute(attribute = "s3.bucket", description = "The S3 bucket where the Object was put in S3"), @WritesAttribute(attribute = "s3.key", description = "The S3 key within where the Object was put in S3"), @WritesAttribute(attribute = "s3.version", description = "The version of the S3 Object that was put to S3"), @WritesAttribute(attribute = "s3.etag", description = "The ETag of the S3 Object"), - @WritesAttribute(attribute = "s3.expiration", description = "A human-readable form of the expiration date of the S3 object, if one is set"), @WritesAttribute(attribute = "s3.uploadId", description = "The uploadId used to upload the Object to S3"), - @WritesAttribute(attribute = "s3.usermetadata", description = "A human-readable form of the User Metadata of the S3 object, if any was set") + @WritesAttribute(attribute = "s3.expiration", description = "A human-readable form of the expiration date of " + + "the S3 object, if one is set"), + @WritesAttribute(attribute = "s3.usermetadata", description = "A human-readable form of the User Metadata of " + + "the S3 object, if any was set") }) public class PutS3Object extends AbstractS3Processor { + public static final long MIN_S3_PART_SIZE = 50L * 1024L * 1024L; + public static final long MAX_S3_PUTOBJECT_SIZE = 5L * 1024L * 1024L * 1024L; + public static final String PERSISTENCE_ROOT = "conf/state/"; + public static final PropertyDescriptor EXPIRATION_RULE_ID = new PropertyDescriptor.Builder() .name("Expiration Time Rule") .required(false) @@ -89,9 +135,51 @@ public class PutS3Object extends AbstractS3Processor { .defaultValue(StorageClass.Standard.name()) .build(); + public static final PropertyDescriptor MULTIPART_THRESHOLD = new PropertyDescriptor.Builder() + .name("Multipart Threshold") + .description("Specifies the file size threshold for switch from the PutS3Object API to the " + + "PutS3MultipartUpload API. Flow files bigger than this limit will be sent using the stateful " + + "multipart process.\n" + + "The valid range is 50MB to 5GB.") + .required(true) + .defaultValue("5 GB") + .addValidator(StandardValidators.createDataSizeBoundsValidator(MIN_S3_PART_SIZE, MAX_S3_PUTOBJECT_SIZE)) + .build(); + + public static final PropertyDescriptor MULTIPART_PART_SIZE = new PropertyDescriptor.Builder() + .name("Multipart Part Size") + .description("Specifies the part size for use when the PutS3Multipart Upload API is used.\n" + + "Flow files will be broken into chunks of this size for the upload process, but the last part " + + "sent can be smaller since it is not padded.\n" + + "The valid range is 50MB to 5GB.") + .required(true) + .defaultValue("5 GB") + .addValidator(StandardValidators.createDataSizeBoundsValidator(MIN_S3_PART_SIZE, MAX_S3_PUTOBJECT_SIZE)) + .build(); + + public static final PropertyDescriptor MULTIPART_S3_AGEOFF_INTERVAL = new PropertyDescriptor.Builder() + .name("Multipart Upload AgeOff Interval") + .description("Specifies the interval at which existing multipart uploads in AWS S3 will be evaluated " + + "for ageoff. When processor is triggered it will initiate the ageoff evaluation if this interval has been " + + "exceeded.") + .required(true) + .defaultValue("60 min") + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .build(); + + public static final PropertyDescriptor MULTIPART_S3_MAX_AGE = new PropertyDescriptor.Builder() + .name("Multipart Upload Max Age Threshold") + .description("Specifies the maximum age for existing multipart uploads in AWS S3. When the ageoff " + + "process occurs, any upload older than this threshold will be aborted.") + .required(true) + .defaultValue("7 days") + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .build(); + public static final List properties = Collections.unmodifiableList( Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, STORAGE_CLASS, REGION, TIMEOUT, EXPIRATION_RULE_ID, - FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE)); + FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER, SSL_CONTEXT_SERVICE, + ENDPOINT_OVERRIDE, MULTIPART_THRESHOLD, MULTIPART_PART_SIZE, MULTIPART_S3_AGEOFF_INTERVAL, MULTIPART_S3_MAX_AGE)); final static String S3_BUCKET_KEY = "s3.bucket"; final static String S3_OBJECT_KEY = "s3.key"; @@ -102,6 +190,11 @@ public class PutS3Object extends AbstractS3Processor { final static String S3_STORAGECLASS_ATTR_KEY = "s3.storeClass"; final static String S3_STORAGECLASS_META_KEY = "x-amz-storage-class"; final static String S3_USERMETA_ATTR_KEY = "s3.usermetadata"; + final static String S3_API_METHOD_ATTR_KEY = "s3.apimethod"; + final static String S3_API_METHOD_PUTOBJECT = "putobject"; + final static String S3_API_METHOD_MULTIPARTUPLOAD = "multipartupload"; + + final static String S3_PROCESS_UNSCHEDULED_MESSAGE = "Processor unscheduled, stopping upload"; @Override protected List getSupportedPropertyDescriptors() { @@ -118,6 +211,144 @@ public class PutS3Object extends AbstractS3Processor { .build(); } + protected File getPersistenceFile() { + return new File(PERSISTENCE_ROOT + getIdentifier()); + } + + protected boolean localUploadExistsInS3(final AmazonS3Client s3, final String bucket, final MultipartState localState) { + ListMultipartUploadsRequest listRequest = new ListMultipartUploadsRequest(bucket); + MultipartUploadListing listing = s3.listMultipartUploads(listRequest); + for (MultipartUpload upload : listing.getMultipartUploads()) { + if (upload.getUploadId().equals(localState.getUploadId())) { + return true; + } + } + return false; + } + + protected synchronized MultipartState getLocalStateIfInS3(final AmazonS3Client s3, final String bucket, + final String s3ObjectKey) throws IOException { + MultipartState currState = getLocalState(s3ObjectKey); + if (currState == null) { + return null; + } + if (localUploadExistsInS3(s3, bucket, currState)) { + getLogger().info("Local state for {} loaded with uploadId {} and {} partETags", + new Object[]{s3ObjectKey, currState.getUploadId(), currState.getPartETags().size()}); + return currState; + } else { + getLogger().info("Local state for {} with uploadId {} does not exist in S3, deleting local state", + new Object[]{s3ObjectKey, currState.getUploadId()}); + persistLocalState(s3ObjectKey, null); + return null; + } + } + + protected synchronized MultipartState getLocalState(final String s3ObjectKey) throws IOException { + // get local state if it exists + final File persistenceFile = getPersistenceFile(); + + if (persistenceFile.exists()) { + final Properties props = new Properties(); + try (final FileInputStream fis = new FileInputStream(persistenceFile)) { + props.load(fis); + } catch (IOException ioe) { + getLogger().warn("Failed to recover local state for {} due to {}. Assuming no local state and " + + "restarting upload.", new Object[]{s3ObjectKey, ioe.getMessage()}); + return null; + } + if (props.containsKey(s3ObjectKey)) { + final String localSerialState = props.getProperty(s3ObjectKey); + if (localSerialState != null) { + try { + return new MultipartState(localSerialState); + } catch (final RuntimeException rte) { + getLogger().warn("Failed to recover local state for {} due to corrupt data in state.", new Object[]{s3ObjectKey, rte.getMessage()}); + return null; + } + } + } + } + return null; + } + + protected synchronized void persistLocalState(final String s3ObjectKey, final MultipartState currState) throws IOException { + final String currStateStr = (currState == null) ? null : currState.toString(); + final File persistenceFile = getPersistenceFile(); + final File parentDir = persistenceFile.getParentFile(); + if (!parentDir.exists() && !parentDir.mkdirs()) { + throw new IOException("Persistence directory (" + parentDir.getAbsolutePath() + ") does not exist and " + + "could not be created."); + } + final Properties props = new Properties(); + if (persistenceFile.exists()) { + try (final FileInputStream fis = new FileInputStream(persistenceFile)) { + props.load(fis); + } + } + if (currStateStr != null) { + currState.setTimestamp(System.currentTimeMillis()); + props.setProperty(s3ObjectKey, currStateStr); + } else { + props.remove(s3ObjectKey); + } + + if (props.size() > 0) { + try (final FileOutputStream fos = new FileOutputStream(persistenceFile)) { + props.store(fos, null); + } catch (IOException ioe) { + getLogger().error("Could not store state {} due to {}.", + new Object[]{persistenceFile.getAbsolutePath(), ioe.getMessage()}); + } + } else { + if (persistenceFile.exists()) { + try { + Files.delete(persistenceFile.toPath()); + } catch (IOException ioe) { + getLogger().error("Could not remove state file {} due to {}.", + new Object[]{persistenceFile.getAbsolutePath(), ioe.getMessage()}); + } + } + } + } + + protected synchronized void removeLocalState(final String s3ObjectKey) throws IOException { + persistLocalState(s3ObjectKey, null); + } + + private synchronized void ageoffLocalState(long ageCutoff) { + // get local state if it exists + final File persistenceFile = getPersistenceFile(); + if (persistenceFile.exists()) { + Properties props = new Properties(); + try (final FileInputStream fis = new FileInputStream(persistenceFile)) { + props.load(fis); + } catch (final IOException ioe) { + getLogger().warn("Failed to ageoff remove local state due to {}", + new Object[]{ioe.getMessage()}); + return; + } + for (Entry entry: props.entrySet()) { + final String key = (String) entry.getKey(); + final String localSerialState = props.getProperty(key); + if (localSerialState != null) { + final MultipartState state = new MultipartState(localSerialState); + if (state.getTimestamp() < ageCutoff) { + getLogger().warn("Removing local state for {} due to exceeding ageoff time", + new Object[]{key}); + try { + removeLocalState(key); + } catch (final IOException ioe) { + getLogger().warn("Failed to remove local state for {} due to {}", + new Object[]{key, ioe.getMessage()}); + + } + } + } + } + } + } + @Override public void onTrigger(final ProcessContext context, final ProcessSession session) { FlowFile flowFile = session.get(); @@ -129,13 +360,28 @@ public class PutS3Object extends AbstractS3Processor { final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions(flowFile).getValue(); final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue(); + final String cacheKey = getIdentifier() + "/" + bucket + "/" + key; final AmazonS3Client s3 = getClient(); final FlowFile ff = flowFile; final Map attributes = new HashMap<>(); + final String ffFilename = ff.getAttributes().get(CoreAttributes.FILENAME.key()); attributes.put(S3_BUCKET_KEY, bucket); attributes.put(S3_OBJECT_KEY, key); + final Long multipartThreshold = context.getProperty(MULTIPART_THRESHOLD).asDataSize(DataUnit.B).longValue(); + final Long multipartPartSize = context.getProperty(MULTIPART_PART_SIZE).asDataSize(DataUnit.B).longValue(); + + final long now = System.currentTimeMillis(); + + /* + * If necessary, run age off for existing uploads in AWS S3 and local state + */ + ageoffS3Uploads(context, s3, now); + + /* + * Then + */ try { session.read(flowFile, new InputStreamCallback() { @Override @@ -145,7 +391,8 @@ public class PutS3Object extends AbstractS3Processor { objectMetadata.setContentDisposition(ff.getAttribute(CoreAttributes.FILENAME.key())); objectMetadata.setContentLength(ff.getSize()); - final String expirationRule = context.getProperty(EXPIRATION_RULE_ID).evaluateAttributeExpressions(ff).getValue(); + final String expirationRule = context.getProperty(EXPIRATION_RULE_ID) + .evaluateAttributeExpressions(ff).getValue(); if (expirationRule != null) { objectMetadata.setExpirationTimeRuleId(expirationRule); } @@ -153,7 +400,8 @@ public class PutS3Object extends AbstractS3Processor { final Map userMetadata = new HashMap<>(); for (final Map.Entry entry : context.getProperties().entrySet()) { if (entry.getKey().isDynamic()) { - final String value = context.getProperty(entry.getKey()).evaluateAttributeExpressions(ff).getValue(); + final String value = context.getProperty( + entry.getKey()).evaluateAttributeExpressions(ff).getValue(); userMetadata.put(entry.getKey().getName(), value); } } @@ -162,34 +410,223 @@ public class PutS3Object extends AbstractS3Processor { objectMetadata.setUserMetadata(userMetadata); } - final PutObjectRequest request = new PutObjectRequest(bucket, key, in, objectMetadata); - request.setStorageClass(StorageClass.valueOf(context.getProperty(STORAGE_CLASS).getValue())); - final AccessControlList acl = createACL(context, ff); - if (acl != null) { - request.setAccessControlList(acl); - } - - final PutObjectResult result = s3.putObject(request); - if (result.getVersionId() != null) { - attributes.put(S3_VERSION_ATTR_KEY, result.getVersionId()); - } - - attributes.put(S3_ETAG_ATTR_KEY, result.getETag()); - - final Date expiration = result.getExpirationTime(); - if (expiration != null) { - attributes.put(S3_EXPIRATION_ATTR_KEY, expiration.toString()); - } - if (result.getMetadata().getRawMetadata().keySet().contains(S3_STORAGECLASS_META_KEY)) { - attributes.put(S3_STORAGECLASS_ATTR_KEY, - result.getMetadata().getRawMetadataValue(S3_STORAGECLASS_META_KEY).toString()); - } - if (userMetadata.size() > 0) { - List pairs = new ArrayList(); - for (String userKey : userMetadata.keySet()) { - pairs.add(userKey + "=" + userMetadata.get(userKey)); + if (ff.getSize() <= multipartThreshold) { + //---------------------------------------- + // single part upload + //---------------------------------------- + final PutObjectRequest request = new PutObjectRequest(bucket, key, in, objectMetadata); + request.setStorageClass( + StorageClass.valueOf(context.getProperty(STORAGE_CLASS).getValue())); + final AccessControlList acl = createACL(context, ff); + if (acl != null) { + request.setAccessControlList(acl); + } + + try { + final PutObjectResult result = s3.putObject(request); + if (result.getVersionId() != null) { + attributes.put(S3_VERSION_ATTR_KEY, result.getVersionId()); + } + if (result.getETag() != null) { + attributes.put(S3_ETAG_ATTR_KEY, result.getETag()); + } + if (result.getExpirationTime() != null) { + attributes.put(S3_EXPIRATION_ATTR_KEY, result.getExpirationTime().toString()); + } + if (result.getMetadata().getRawMetadata().keySet().contains(S3_STORAGECLASS_META_KEY)) { + attributes.put(S3_STORAGECLASS_ATTR_KEY, + result.getMetadata().getRawMetadataValue(S3_STORAGECLASS_META_KEY).toString()); + } + if (userMetadata.size() > 0) { + StringBuilder userMetaBldr = new StringBuilder(); + for (String userKey : userMetadata.keySet()) { + userMetaBldr.append(userKey).append("=").append(userMetadata.get(userKey)); + } + attributes.put(S3_USERMETA_ATTR_KEY, userMetaBldr.toString()); + } + attributes.put(S3_API_METHOD_ATTR_KEY, S3_API_METHOD_PUTOBJECT); + } catch (AmazonClientException e) { + getLogger().info("Failure completing upload flowfile={} bucket={} key={} reason={}", + new Object[]{ffFilename, bucket, key, e.getMessage()}); + throw (e); + } + } else { + //---------------------------------------- + // multipart upload + //---------------------------------------- + + // load or create persistent state + //------------------------------------------------------------ + MultipartState currentState; + try { + currentState = getLocalStateIfInS3(s3, bucket, cacheKey); + if (currentState != null) { + if (currentState.getPartETags().size() > 0) { + final PartETag lastETag = currentState.getPartETags().get( + currentState.getPartETags().size() - 1); + getLogger().info("Resuming upload for flowfile='{}' bucket='{}' key='{}' " + + "uploadID='{}' filePosition='{}' partSize='{}' storageClass='{}' " + + "contentLength='{}' partsLoaded={} lastPart={}/{}", + new Object[]{ffFilename, bucket, key, currentState.getUploadId(), + currentState.getFilePosition(), currentState.getPartSize(), + currentState.getStorageClass().toString(), + currentState.getContentLength(), + currentState.getPartETags().size(), + Integer.toString(lastETag.getPartNumber()), + lastETag.getETag()}); + } else { + getLogger().info("Resuming upload for flowfile='{}' bucket='{}' key='{}' " + + "uploadID='{}' filePosition='{}' partSize='{}' storageClass='{}' " + + "contentLength='{}' no partsLoaded", + new Object[]{ffFilename, bucket, key, currentState.getUploadId(), + currentState.getFilePosition(), currentState.getPartSize(), + currentState.getStorageClass().toString(), + currentState.getContentLength()}); + } + } else { + currentState = new MultipartState(); + currentState.setPartSize(multipartPartSize); + currentState.setStorageClass( + StorageClass.valueOf(context.getProperty(STORAGE_CLASS).getValue())); + currentState.setContentLength(ff.getSize()); + persistLocalState(cacheKey, currentState); + getLogger().info("Starting new upload for flowfile='{}' bucket='{}' key='{}'", + new Object[]{ffFilename, bucket, key}); + } + } catch (IOException e) { + getLogger().error("IOException initiating cache state while processing flow files: " + + e.getMessage()); + throw (e); + } + + // initiate multipart upload or find position in file + //------------------------------------------------------------ + if (currentState.getUploadId().isEmpty()) { + final InitiateMultipartUploadRequest initiateRequest = + new InitiateMultipartUploadRequest(bucket, key, objectMetadata); + initiateRequest.setStorageClass(currentState.getStorageClass()); + final AccessControlList acl = createACL(context, ff); + if (acl != null) { + initiateRequest.setAccessControlList(acl); + } + try { + final InitiateMultipartUploadResult initiateResult = + s3.initiateMultipartUpload(initiateRequest); + currentState.setUploadId(initiateResult.getUploadId()); + currentState.getPartETags().clear(); + try { + persistLocalState(cacheKey, currentState); + } catch (Exception e) { + getLogger().info("Exception saving cache state while processing flow file: " + + e.getMessage()); + throw(new ProcessException("Exception saving cache state", e)); + } + getLogger().info("Success initiating upload flowfile={} available={} position={} " + + "length={} bucket={} key={} uploadId={}", + new Object[]{ffFilename, in.available(), currentState.getFilePosition(), + currentState.getContentLength(), bucket, key, + currentState.getUploadId()}); + if (initiateResult.getUploadId() != null) { + attributes.put(S3_UPLOAD_ID_ATTR_KEY, initiateResult.getUploadId()); + } + } catch (AmazonClientException e) { + getLogger().info("Failure initiating upload flowfile={} bucket={} key={} reason={}", + new Object[]{ffFilename, bucket, key, e.getMessage()}); + throw(e); + } + } else { + if (currentState.getFilePosition() > 0) { + try { + final long skipped = in.skip(currentState.getFilePosition()); + if (skipped != currentState.getFilePosition()) { + getLogger().info("Failure skipping to resume upload flowfile={} " + + "bucket={} key={} position={} skipped={}", + new Object[]{ffFilename, bucket, key, + currentState.getFilePosition(), skipped}); + } + } catch (Exception e) { + getLogger().info("Failure skipping to resume upload flowfile={} bucket={} " + + "key={} position={} reason={}", + new Object[]{ffFilename, bucket, key, currentState.getFilePosition(), + e.getMessage()}); + throw(new ProcessException(e)); + } + } + } + + // upload parts + //------------------------------------------------------------ + long thisPartSize; + for (int part = currentState.getPartETags().size() + 1; + currentState.getFilePosition() < currentState.getContentLength(); part++) { + if (!PutS3Object.this.isScheduled()) { + throw new IOException(S3_PROCESS_UNSCHEDULED_MESSAGE + " flowfile=" + ffFilename + + " part=" + part + " uploadId=" + currentState.getUploadId()); + } + thisPartSize = Math.min(currentState.getPartSize(), + (currentState.getContentLength() - currentState.getFilePosition())); + UploadPartRequest uploadRequest = new UploadPartRequest() + .withBucketName(bucket) + .withKey(key) + .withUploadId(currentState.getUploadId()) + .withInputStream(in) + .withPartNumber(part) + .withPartSize(thisPartSize); + try { + UploadPartResult uploadPartResult = s3.uploadPart(uploadRequest); + currentState.addPartETag(uploadPartResult.getPartETag()); + currentState.setFilePosition(currentState.getFilePosition() + thisPartSize); + try { + persistLocalState(cacheKey, currentState); + } catch (Exception e) { + getLogger().info("Exception saving cache state processing flow file: " + + e.getMessage()); + } + getLogger().info("Success uploading part flowfile={} part={} available={} " + + "etag={} uploadId={}", new Object[]{ffFilename, part, in.available(), + uploadPartResult.getETag(), currentState.getUploadId()}); + } catch (AmazonClientException e) { + getLogger().info("Failure uploading part flowfile={} part={} bucket={} key={} " + + "reason={}", new Object[]{ffFilename, part, bucket, key, e.getMessage()}); + throw (e); + } + } + + // complete multipart upload + //------------------------------------------------------------ + CompleteMultipartUploadRequest completeRequest = new CompleteMultipartUploadRequest( + bucket, key, currentState.getUploadId(), currentState.getPartETags()); + try { + CompleteMultipartUploadResult completeResult = + s3.completeMultipartUpload(completeRequest); + getLogger().info("Success completing upload flowfile={} etag={} uploadId={}", + new Object[]{ffFilename, completeResult.getETag(), currentState.getUploadId()}); + if (completeResult.getVersionId() != null) { + attributes.put(S3_VERSION_ATTR_KEY, completeResult.getVersionId()); + } + if (completeResult.getETag() != null) { + attributes.put(S3_ETAG_ATTR_KEY, completeResult.getETag()); + } + if (completeResult.getExpirationTime() != null) { + attributes.put(S3_EXPIRATION_ATTR_KEY, + completeResult.getExpirationTime().toString()); + } + if (currentState.getStorageClass() != null) { + attributes.put(S3_STORAGECLASS_ATTR_KEY, currentState.getStorageClass().toString()); + } + if (userMetadata.size() > 0) { + StringBuilder userMetaBldr = new StringBuilder(); + for (String userKey : userMetadata.keySet()) { + userMetaBldr.append(userKey).append("=").append(userMetadata.get(userKey)); + } + attributes.put(S3_USERMETA_ATTR_KEY, userMetaBldr.toString()); + } + attributes.put(S3_API_METHOD_ATTR_KEY, S3_API_METHOD_MULTIPARTUPLOAD); + } catch (AmazonClientException e) { + getLogger().info("Failure completing upload flowfile={} bucket={} key={} reason={}", + new Object[]{ffFilename, bucket, key, e.getMessage()}); + throw (e); } - attributes.put(S3_USERMETA_ATTR_KEY, StringUtils.join(pairs, ", ")); } } } @@ -205,10 +642,205 @@ public class PutS3Object extends AbstractS3Processor { session.getProvenanceReporter().send(flowFile, url, millis); getLogger().info("Successfully put {} to Amazon S3 in {} milliseconds", new Object[] {ff, millis}); + try { + removeLocalState(cacheKey); + } catch (IOException e) { + getLogger().info("Error trying to delete key {} from cache: {}", + new Object[]{cacheKey, e.getMessage()}); + } } catch (final ProcessException | AmazonClientException pe) { - getLogger().error("Failed to put {} to Amazon S3 due to {}", new Object[] {flowFile, pe}); - flowFile = session.penalize(flowFile); - session.transfer(flowFile, REL_FAILURE); + if (pe.getMessage().contains(S3_PROCESS_UNSCHEDULED_MESSAGE)) { + getLogger().info(pe.getMessage()); + session.rollback(); + } else { + getLogger().error("Failed to put {} to Amazon S3 due to {}", new Object[]{flowFile, pe}); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + } + } + + } + + private final Lock s3BucketLock = new ReentrantLock(); + private final AtomicLong lastS3AgeOff = new AtomicLong(0L); + private final DateFormat logFormat = new SimpleDateFormat(); + + protected void ageoffS3Uploads(final ProcessContext context, final AmazonS3Client s3, final long now) { + MultipartUploadListing oldUploads = getS3AgeoffListAndAgeoffLocalState(context, s3, now); + for (MultipartUpload upload : oldUploads.getMultipartUploads()) { + abortS3MultipartUpload(s3, oldUploads.getBucketName(), upload); + } + } + + protected MultipartUploadListing getS3AgeoffListAndAgeoffLocalState(final ProcessContext context, final AmazonS3Client s3, final long now) { + final long ageoff_interval = context.getProperty(MULTIPART_S3_AGEOFF_INTERVAL).asTimePeriod(TimeUnit.MILLISECONDS); + final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions().getValue(); + final Long maxAge = context.getProperty(MULTIPART_S3_MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS); + final long ageCutoff = now - maxAge; + + final List ageoffList = new ArrayList<>(); + if ((lastS3AgeOff.get() < now - ageoff_interval) && s3BucketLock.tryLock()) { + try { + + ListMultipartUploadsRequest listRequest = new ListMultipartUploadsRequest(bucket); + MultipartUploadListing listing = s3.listMultipartUploads(listRequest); + for (MultipartUpload upload : listing.getMultipartUploads()) { + long uploadTime = upload.getInitiated().getTime(); + if (uploadTime < ageCutoff) { + ageoffList.add(upload); + } + } + + // ageoff any local state + ageoffLocalState(ageCutoff); + lastS3AgeOff.set(System.currentTimeMillis()); + } catch(AmazonClientException e) { + getLogger().error("Error checking S3 Multipart Upload list for {}: {}", + new Object[]{bucket, e.getMessage()}); + } finally { + s3BucketLock.unlock(); + } + } + MultipartUploadListing result = new MultipartUploadListing(); + result.setBucketName(bucket); + result.setMultipartUploads(ageoffList); + return result; + } + + protected void abortS3MultipartUpload(final AmazonS3Client s3, final String bucket, final MultipartUpload upload) { + final String uploadKey = upload.getKey(); + final String uploadId = upload.getUploadId(); + final AbortMultipartUploadRequest abortRequest = new AbortMultipartUploadRequest( + bucket, uploadKey, uploadId); + try { + s3.abortMultipartUpload(abortRequest); + getLogger().info("Aborting out of date multipart upload, bucket {} key {} ID {}, initiated {}", + new Object[]{bucket, uploadKey, uploadId, logFormat.format(upload.getInitiated())}); + } catch (AmazonClientException ace) { + getLogger().info("Error trying to abort multipart upload from bucket {} with key {} and ID {}: {}", + new Object[]{bucket, uploadKey, uploadId, ace.getMessage()}); + } + } + + protected static class MultipartState implements Serializable { + + private static final long serialVersionUID = 9006072180563519740L; + + private static final String SEPARATOR = "#"; + + private String _uploadId; + private Long _filePosition; + private List _partETags; + private Long _partSize; + private StorageClass _storageClass; + private Long _contentLength; + private Long _timestamp; + + public MultipartState() { + _uploadId = ""; + _filePosition = 0L; + _partETags = new ArrayList<>(); + _partSize = 0L; + _storageClass = StorageClass.Standard; + _contentLength = 0L; + _timestamp = System.currentTimeMillis(); + } + + // create from a previous toString() result + public MultipartState(String buf) { + String[] fields = buf.split(SEPARATOR); + _uploadId = fields[0]; + _filePosition = Long.parseLong(fields[1]); + _partETags = new ArrayList<>(); + for (String part : fields[2].split(",")) { + if (part != null && !part.isEmpty()) { + String[] partFields = part.split("/"); + _partETags.add(new PartETag(Integer.parseInt(partFields[0]), partFields[1])); + } + } + _partSize = Long.parseLong(fields[3]); + _storageClass = StorageClass.fromValue(fields[4]); + _contentLength = Long.parseLong(fields[5]); + _timestamp = Long.parseLong(fields[6]); + } + + public String getUploadId() { + return _uploadId; + } + + public void setUploadId(String id) { + _uploadId = id; + } + + public Long getFilePosition() { + return _filePosition; + } + + public void setFilePosition(Long pos) { + _filePosition = pos; + } + + public List getPartETags() { + return _partETags; + } + + public void addPartETag(PartETag tag) { + _partETags.add(tag); + } + + public Long getPartSize() { + return _partSize; + } + + public void setPartSize(Long size) { + _partSize = size; + } + + public StorageClass getStorageClass() { + return _storageClass; + } + + public void setStorageClass(StorageClass aClass) { + _storageClass = aClass; + } + + public Long getContentLength() { + return _contentLength; + } + + public void setContentLength(Long length) { + _contentLength = length; + } + + public Long getTimestamp() { + return _timestamp; + } + + public void setTimestamp(Long timestamp) { + _timestamp = timestamp; + } + + public String toString() { + StringBuilder buf = new StringBuilder(); + buf.append(_uploadId).append(SEPARATOR) + .append(_filePosition.toString()).append(SEPARATOR); + if (_partETags.size() > 0) { + boolean first = true; + for (PartETag tag : _partETags) { + if (!first) { + buf.append(","); + } else { + first = false; + } + buf.append(String.format("%d/%s", tag.getPartNumber(), tag.getETag())); + } + } + buf.append(SEPARATOR) + .append(_partSize.toString()).append(SEPARATOR) + .append(_storageClass.toString()).append(SEPARATOR) + .append(_contentLength.toString()).append(SEPARATOR) + .append(_timestamp.toString()); + return buf.toString(); } } } diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java index d0dafb8273..11fdffe2ff 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java @@ -37,7 +37,6 @@ import java.net.URI; import java.net.URISyntaxException; import java.nio.file.Path; import java.nio.file.Paths; -import java.util.Iterator; import static org.junit.Assert.fail; @@ -50,9 +49,14 @@ import static org.junit.Assert.fail; */ public abstract class AbstractS3IT { protected final static String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties"; - protected final static String BUCKET_NAME = "test-bucket-00000000-0000-0000-0000-123456789021"; protected final static String SAMPLE_FILE_RESOURCE_NAME = "/hello.txt"; - protected final static String REGION = "eu-west-1"; + protected final static String REGION = "us-west-1"; + // Adding REGION to bucket prevents errors of + // "A conflicting conditional operation is currently in progress against this resource." + // when bucket is rapidly added/deleted and consistency propogation causes this error. + // (Should not be necessary if REGION remains static, but added to prevent future frustration.) + // [see http://stackoverflow.com/questions/13898057/aws-error-message-a-conflicting-conditional-operation-is-currently-in-progress] + protected final static String BUCKET_NAME = "test-bucket-00000000-0000-0000-0000-123456789021-" + REGION; // Static so multiple Tests can use same client protected static AmazonS3Client client; @@ -99,8 +103,7 @@ public abstract class AbstractS3IT { ObjectListing objectListing = client.listObjects(BUCKET_NAME); while (true) { - for (Iterator iterator = objectListing.getObjectSummaries().iterator(); iterator.hasNext(); ) { - S3ObjectSummary objectSummary = (S3ObjectSummary) iterator.next(); + for (S3ObjectSummary objectSummary : objectListing.getObjectSummaries()) { client.deleteObject(BUCKET_NAME, objectSummary.getKey()); } diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java index f2e938ceee..41d504f121 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java @@ -16,29 +16,63 @@ */ package org.apache.nifi.processors.aws.s3; -import com.amazonaws.services.s3.model.StorageClass; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.regex.Pattern; + +import com.amazonaws.AmazonServiceException; +import com.amazonaws.services.s3.model.ListMultipartUploadsRequest; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor; import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventType; +import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.Assert; +import org.junit.Ignore; import org.junit.Test; -import java.io.IOException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; +import com.amazonaws.AmazonClientException; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; +import com.amazonaws.services.s3.model.MultipartUpload; +import com.amazonaws.services.s3.model.MultipartUploadListing; +import com.amazonaws.services.s3.model.PartETag; +import com.amazonaws.services.s3.model.Region; +import com.amazonaws.services.s3.model.StorageClass; /** * Provides integration level testing with actual AWS S3 resources for {@link PutS3Object} and requires additional configuration and resources to work. */ public class ITPutS3Object extends AbstractS3IT { + final static String TEST_ENDPOINT = "https://endpoint.com"; + // final static String TEST_TRANSIT_URI = "https://" + BUCKET_NAME + ".endpoint.com"; + final static String TEST_PARTSIZE_STRING = "50 mb"; + final static Long TEST_PARTSIZE_LONG = 50L * 1024L * 1024L; + + final static Long S3_MINIMUM_PART_SIZE = 50L * 1024L * 1024L; + final static Long S3_MAXIMUM_OBJECT_SIZE = 5L * 1024L * 1024L * 1024L; + + final static Pattern reS3ETag = Pattern.compile("[0-9a-fA-f]{32,32}(-[0-9]+)?"); + @Test public void testSimplePut() throws IOException { final TestRunner runner = TestRunners.newTestRunner(new PutS3Object()); @@ -119,12 +153,12 @@ public class ITPutS3Object extends AbstractS3IT { @Test public void testPutInFolder() throws IOException { final TestRunner runner = TestRunners.newTestRunner(new PutS3Object()); - runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE); runner.setProperty(PutS3Object.REGION, REGION); runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME); Assert.assertTrue(runner.setProperty("x-custom-prop", "hello").isValid()); + runner.assertValid(); final Map attrs = new HashMap<>(); attrs.put("filename", "folder/1.txt"); @@ -144,15 +178,30 @@ public class ITPutS3Object extends AbstractS3IT { runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME); runner.setProperty(PutS3Object.STORAGE_CLASS, StorageClass.ReducedRedundancy.name()); + int bytesNeeded = 55 * 1024 * 1024; + StringBuilder bldr = new StringBuilder(bytesNeeded + 1000); + for (int line = 0; line < 55; line++) { + bldr.append(String.format("line %06d This is sixty-three characters plus the EOL marker!\n", line)); + } + String data55mb = bldr.toString(); + Assert.assertTrue(runner.setProperty("x-custom-prop", "hello").isValid()); final Map attrs = new HashMap<>(); attrs.put("filename", "folder/2.txt"); runner.enqueue(getResourcePath(SAMPLE_FILE_RESOURCE_NAME), attrs); + attrs.put("filename", "folder/3.txt"); + runner.enqueue(data55mb.getBytes(), attrs); - runner.run(); + runner.run(2); - runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 1); + runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 2); + FlowFile file1 = runner.getFlowFilesForRelationship(PutS3Object.REL_SUCCESS).get(0); + Assert.assertEquals(StorageClass.ReducedRedundancy.toString(), + file1.getAttribute(PutS3Object.S3_STORAGECLASS_ATTR_KEY)); + FlowFile file2 = runner.getFlowFilesForRelationship(PutS3Object.REL_SUCCESS).get(1); + Assert.assertEquals(StorageClass.ReducedRedundancy.toString(), + file2.getAttribute(PutS3Object.S3_STORAGECLASS_ATTR_KEY)); } @Test @@ -178,7 +227,7 @@ public class ITPutS3Object extends AbstractS3IT { public void testGetPropertyDescriptors() throws Exception { PutS3Object processor = new PutS3Object(); List pd = processor.getSupportedPropertyDescriptors(); - assertEquals("size should be eq", 18, pd.size()); + assertEquals("size should be eq", 22, pd.size()); assertTrue(pd.contains(PutS3Object.ACCESS_KEY)); assertTrue(pd.contains(PutS3Object.AWS_CREDENTIALS_PROVIDER_SERVICE)); assertTrue(pd.contains(PutS3Object.BUCKET)); @@ -198,4 +247,536 @@ public class ITPutS3Object extends AbstractS3IT { assertTrue(pd.contains(PutS3Object.WRITE_ACL_LIST)); assertTrue(pd.contains(PutS3Object.WRITE_USER_LIST)); } -} \ No newline at end of file + + @Test + public void testDynamicProperty() throws IOException { + final String DYNAMIC_ATTRIB_KEY = "fs.runTimestamp"; + final String DYNAMIC_ATTRIB_VALUE = "${now():toNumber()}"; + + final PutS3Object processor = new PutS3Object(); + final TestRunner runner = TestRunners.newTestRunner(processor); + + runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE); + runner.setProperty(PutS3Object.REGION, REGION); + runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME); + runner.setProperty(PutS3Object.MULTIPART_PART_SIZE, TEST_PARTSIZE_STRING); + PropertyDescriptor testAttrib = processor.getSupportedDynamicPropertyDescriptor(DYNAMIC_ATTRIB_KEY); + runner.setProperty(testAttrib, DYNAMIC_ATTRIB_VALUE); + + final String FILE1_NAME = "file1"; + Map attribs = new HashMap<>(); + attribs.put(CoreAttributes.FILENAME.key(), FILE1_NAME); + runner.enqueue("123".getBytes(), attribs); + + runner.assertValid(); + processor.getPropertyDescriptor(DYNAMIC_ATTRIB_KEY); + runner.run(); + runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS); + final List successFiles = runner.getFlowFilesForRelationship(PutS3Object.REL_SUCCESS); + Assert.assertEquals(1, successFiles.size()); + MockFlowFile ff1 = successFiles.get(0); + + Long now = System.currentTimeMillis(); + String millisNow = Long.toString(now); + String millisOneSecAgo = Long.toString(now - 1000L); + String usermeta = ff1.getAttribute(PutS3Object.S3_USERMETA_ATTR_KEY); + String[] usermetaLine0 = usermeta.split(System.lineSeparator())[0].split("="); + String usermetaKey0 = usermetaLine0[0]; + String usermetaValue0 = usermetaLine0[1]; + Assert.assertEquals(DYNAMIC_ATTRIB_KEY, usermetaKey0); + Assert.assertTrue(usermetaValue0.compareTo(millisOneSecAgo) >=0 && usermetaValue0.compareTo(millisNow) <= 0); + } + + @Test + public void testProvenance() throws InitializationException { + final String PROV1_FILE = "provfile1"; + + final PutS3Object processor = new PutS3Object(); + final TestRunner runner = TestRunners.newTestRunner(processor); + + runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE); + runner.setProperty(PutS3Object.REGION, REGION); + runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME); + runner.setProperty(PutS3Object.KEY, "${filename}"); + + Map attribs = new HashMap<>(); + attribs.put(CoreAttributes.FILENAME.key(), PROV1_FILE); + runner.enqueue("prov1 contents".getBytes(), attribs); + + runner.assertValid(); + runner.run(); + runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS); + final List successFiles = runner.getFlowFilesForRelationship(PutS3Object.REL_SUCCESS); + Assert.assertEquals(1, successFiles.size()); + + final List provenanceEvents = runner.getProvenanceEvents(); + Assert.assertEquals(1, provenanceEvents.size()); + ProvenanceEventRecord provRec1 = provenanceEvents.get(0); + Assert.assertEquals(ProvenanceEventType.SEND, provRec1.getEventType()); + Assert.assertEquals(processor.getIdentifier(), provRec1.getComponentId()); + client.setRegion(Region.fromValue(REGION).toAWSRegion()); + String targetUri = client.getUrl(BUCKET_NAME, PROV1_FILE).toString(); + Assert.assertEquals(targetUri, provRec1.getTransitUri()); + Assert.assertEquals(7, provRec1.getUpdatedAttributes().size()); + Assert.assertEquals(BUCKET_NAME, provRec1.getUpdatedAttributes().get(PutS3Object.S3_BUCKET_KEY)); + } + + @Test + public void testStateDefaults() { + PutS3Object.MultipartState state1 = new PutS3Object.MultipartState(); + Assert.assertEquals(state1.getUploadId(), ""); + Assert.assertEquals(state1.getFilePosition(), (Long) 0L); + Assert.assertEquals(state1.getPartETags().size(), 0L); + Assert.assertEquals(state1.getPartSize(), (Long) 0L); + Assert.assertEquals(state1.getStorageClass().toString(), StorageClass.Standard.toString()); + Assert.assertEquals(state1.getContentLength(), (Long) 0L); + } + + @Test + public void testStateToString() throws IOException, InitializationException { + final String target = "UID-test1234567890#10001#1/PartETag-1,2/PartETag-2,3/PartETag-3,4/PartETag-4#20002#REDUCED_REDUNDANCY#30003#8675309"; + PutS3Object.MultipartState state2 = new PutS3Object.MultipartState(); + state2.setUploadId("UID-test1234567890"); + state2.setFilePosition(10001L); + state2.setTimestamp(8675309L); + for (Integer partNum = 1; partNum < 5; partNum++) { + state2.addPartETag(new PartETag(partNum, "PartETag-" + partNum.toString())); + } + state2.setPartSize(20002L); + state2.setStorageClass(StorageClass.ReducedRedundancy); + state2.setContentLength(30003L); + Assert.assertEquals(target, state2.toString()); + } + + @Test + public void testEndpointOverride() { + // remove leading "/" from filename to avoid duplicate separators + final String TESTKEY = AbstractS3IT.SAMPLE_FILE_RESOURCE_NAME.substring(1); + + final PutS3Object processor = new TestablePutS3Object(); + final TestRunner runner = TestRunners.newTestRunner(processor); + final ProcessContext context = runner.getProcessContext(); + + runner.setProperty(PutS3Object.ENDPOINT_OVERRIDE, TEST_ENDPOINT); + runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME); + runner.setProperty(PutS3Object.KEY, TESTKEY); + + runner.run(); + + Assert.assertEquals(BUCKET_NAME, context.getProperty(PutS3Object.BUCKET).toString()); + Assert.assertEquals(TESTKEY, context.getProperty(PutS3Object.KEY).evaluateAttributeExpressions().toString()); + Assert.assertEquals(TEST_ENDPOINT, context.getProperty(PutS3Object.ENDPOINT_OVERRIDE).toString()); + + String s3url = ((TestablePutS3Object)processor).testable_getClient().getResourceUrl(BUCKET_NAME, TESTKEY); + Assert.assertEquals(TEST_ENDPOINT + "/" + BUCKET_NAME + "/" + TESTKEY, s3url); + } + + @Test + public void testMultipartProperties() throws IOException { + final PutS3Object processor = new PutS3Object(); + final TestRunner runner = TestRunners.newTestRunner(processor); + final ProcessContext context = runner.getProcessContext(); + + runner.setProperty(PutS3Object.FULL_CONTROL_USER_LIST, + "28545acd76c35c7e91f8409b95fd1aa0c0914bfa1ac60975d9f48bc3c5e090b5"); + runner.setProperty(PutS3Object.REGION, REGION); + runner.setProperty(PutS3Object.MULTIPART_PART_SIZE, TEST_PARTSIZE_STRING); + runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME); + runner.setProperty(PutS3Object.KEY, AbstractS3IT.SAMPLE_FILE_RESOURCE_NAME); + + Assert.assertEquals(BUCKET_NAME, context.getProperty(PutS3Object.BUCKET).toString()); + Assert.assertEquals(SAMPLE_FILE_RESOURCE_NAME, context.getProperty(PutS3Object.KEY).evaluateAttributeExpressions().toString()); + Assert.assertEquals(TEST_PARTSIZE_LONG.longValue(), + context.getProperty(PutS3Object.MULTIPART_PART_SIZE).asDataSize(DataUnit.B).longValue()); + } + + @Test + public void testLocalStatePersistence() throws IOException { + final PutS3Object processor = new PutS3Object(); + final TestRunner runner = TestRunners.newTestRunner(processor); + + final String bucket = runner.getProcessContext().getProperty(PutS3Object.BUCKET).getValue(); + final String key = runner.getProcessContext().getProperty(PutS3Object.KEY).getValue(); + final String cacheKey1 = runner.getProcessor().getIdentifier() + "/" + bucket + "/" + key; + final String cacheKey2 = runner.getProcessor().getIdentifier() + "/" + bucket + "/" + key + "-v2"; + final String cacheKey3 = runner.getProcessor().getIdentifier() + "/" + bucket + "/" + key + "-v3"; + + /* + * store 3 versions of state + */ + PutS3Object.MultipartState state1orig = new PutS3Object.MultipartState(); + processor.persistLocalState(cacheKey1, state1orig); + + PutS3Object.MultipartState state2orig = new PutS3Object.MultipartState(); + state2orig.setUploadId("1234"); + state2orig.setContentLength(1234L); + processor.persistLocalState(cacheKey2, state2orig); + + PutS3Object.MultipartState state3orig = new PutS3Object.MultipartState(); + state3orig.setUploadId("5678"); + state3orig.setContentLength(5678L); + processor.persistLocalState(cacheKey3, state3orig); + + final List uploadList = new ArrayList<>(); + final MultipartUpload upload1 = new MultipartUpload(); + upload1.setKey(key); + upload1.setUploadId(""); + uploadList.add(upload1); + final MultipartUpload upload2 = new MultipartUpload(); + upload2.setKey(key + "-v2"); + upload2.setUploadId("1234"); + uploadList.add(upload2); + final MultipartUpload upload3 = new MultipartUpload(); + upload3.setKey(key + "-v3"); + upload3.setUploadId("5678"); + uploadList.add(upload3); + final MultipartUploadListing uploadListing = new MultipartUploadListing(); + uploadListing.setMultipartUploads(uploadList); + final MockAmazonS3Client mockClient = new MockAmazonS3Client(); + mockClient.setListing(uploadListing); + + /* + * reload and validate stored state + */ + final PutS3Object.MultipartState state1new = processor.getLocalStateIfInS3(mockClient, bucket, cacheKey1); + Assert.assertEquals("", state1new.getUploadId()); + Assert.assertEquals(0L, state1new.getFilePosition().longValue()); + Assert.assertEquals(new ArrayList(), state1new.getPartETags()); + Assert.assertEquals(0L, state1new.getPartSize().longValue()); + Assert.assertEquals(StorageClass.fromValue(StorageClass.Standard.toString()), state1new.getStorageClass()); + Assert.assertEquals(0L, state1new.getContentLength().longValue()); + + final PutS3Object.MultipartState state2new = processor.getLocalStateIfInS3(mockClient, bucket, cacheKey2); + Assert.assertEquals("1234", state2new.getUploadId()); + Assert.assertEquals(0L, state2new.getFilePosition().longValue()); + Assert.assertEquals(new ArrayList(), state2new.getPartETags()); + Assert.assertEquals(0L, state2new.getPartSize().longValue()); + Assert.assertEquals(StorageClass.fromValue(StorageClass.Standard.toString()), state2new.getStorageClass()); + Assert.assertEquals(1234L, state2new.getContentLength().longValue()); + + final PutS3Object.MultipartState state3new = processor.getLocalStateIfInS3(mockClient, bucket, cacheKey3); + Assert.assertEquals("5678", state3new.getUploadId()); + Assert.assertEquals(0L, state3new.getFilePosition().longValue()); + Assert.assertEquals(new ArrayList(), state3new.getPartETags()); + Assert.assertEquals(0L, state3new.getPartSize().longValue()); + Assert.assertEquals(StorageClass.fromValue(StorageClass.Standard.toString()), state3new.getStorageClass()); + Assert.assertEquals(5678L, state3new.getContentLength().longValue()); + } + + @Test + public void testStatePersistsETags() throws IOException { + final PutS3Object processor = new PutS3Object(); + final TestRunner runner = TestRunners.newTestRunner(processor); + + final String bucket = runner.getProcessContext().getProperty(PutS3Object.BUCKET).getValue(); + final String key = runner.getProcessContext().getProperty(PutS3Object.KEY).getValue(); + final String cacheKey1 = runner.getProcessor().getIdentifier() + "/" + bucket + "/" + key + "-bv1"; + final String cacheKey2 = runner.getProcessor().getIdentifier() + "/" + bucket + "/" + key + "-bv2"; + final String cacheKey3 = runner.getProcessor().getIdentifier() + "/" + bucket + "/" + key + "-bv3"; + + /* + * store 3 versions of state + */ + PutS3Object.MultipartState state1orig = new PutS3Object.MultipartState(); + processor.persistLocalState(cacheKey1, state1orig); + + PutS3Object.MultipartState state2orig = new PutS3Object.MultipartState(); + state2orig.setUploadId("1234"); + state2orig.setContentLength(1234L); + processor.persistLocalState(cacheKey2, state2orig); + + PutS3Object.MultipartState state3orig = new PutS3Object.MultipartState(); + state3orig.setUploadId("5678"); + state3orig.setContentLength(5678L); + processor.persistLocalState(cacheKey3, state3orig); + + /* + * persist state to caches so that + * 1. v2 has 2 and then 4 tags + * 2. v3 has 4 and then 2 tags + */ + state2orig.getPartETags().add(new PartETag(1, "state 2 tag one")); + state2orig.getPartETags().add(new PartETag(2, "state 2 tag two")); + processor.persistLocalState(cacheKey2, state2orig); + state2orig.getPartETags().add(new PartETag(3, "state 2 tag three")); + state2orig.getPartETags().add(new PartETag(4, "state 2 tag four")); + processor.persistLocalState(cacheKey2, state2orig); + + state3orig.getPartETags().add(new PartETag(1, "state 3 tag one")); + state3orig.getPartETags().add(new PartETag(2, "state 3 tag two")); + state3orig.getPartETags().add(new PartETag(3, "state 3 tag three")); + state3orig.getPartETags().add(new PartETag(4, "state 3 tag four")); + processor.persistLocalState(cacheKey3, state3orig); + state3orig.getPartETags().remove(state3orig.getPartETags().size() - 1); + state3orig.getPartETags().remove(state3orig.getPartETags().size() - 1); + processor.persistLocalState(cacheKey3, state3orig); + + final List uploadList = new ArrayList<>(); + final MultipartUpload upload1 = new MultipartUpload(); + upload1.setKey(key + "-bv2"); + upload1.setUploadId("1234"); + uploadList.add(upload1); + final MultipartUpload upload2 = new MultipartUpload(); + upload2.setKey(key + "-bv3"); + upload2.setUploadId("5678"); + uploadList.add(upload2); + final MultipartUploadListing uploadListing = new MultipartUploadListing(); + uploadListing.setMultipartUploads(uploadList); + final MockAmazonS3Client mockClient = new MockAmazonS3Client(); + mockClient.setListing(uploadListing); + + /* + * load state and validate that + * 1. v2 restore shows 4 tags + * 2. v3 restore shows 2 tags + */ + final PutS3Object.MultipartState state2new = processor.getLocalStateIfInS3(mockClient, bucket, cacheKey2); + Assert.assertEquals("1234", state2new.getUploadId()); + Assert.assertEquals(4, state2new.getPartETags().size()); + + final PutS3Object.MultipartState state3new = processor.getLocalStateIfInS3(mockClient, bucket, cacheKey3); + Assert.assertEquals("5678", state3new.getUploadId()); + Assert.assertEquals(2, state3new.getPartETags().size()); + } + + @Test + public void testStateRemove() throws IOException { + final PutS3Object processor = new PutS3Object(); + final TestRunner runner = TestRunners.newTestRunner(processor); + + final String bucket = runner.getProcessContext().getProperty(PutS3Object.BUCKET).getValue(); + final String key = runner.getProcessContext().getProperty(PutS3Object.KEY).getValue(); + final String cacheKey = runner.getProcessor().getIdentifier() + "/" + bucket + "/" + key + "-sr"; + + final List uploadList = new ArrayList<>(); + final MultipartUpload upload1 = new MultipartUpload(); + upload1.setKey(key); + upload1.setUploadId("1234"); + uploadList.add(upload1); + final MultipartUploadListing uploadListing = new MultipartUploadListing(); + uploadListing.setMultipartUploads(uploadList); + final MockAmazonS3Client mockClient = new MockAmazonS3Client(); + mockClient.setListing(uploadListing); + + /* + * store state, retrieve and validate, remove and validate + */ + PutS3Object.MultipartState stateOrig = new PutS3Object.MultipartState(); + stateOrig.setUploadId("1234"); + stateOrig.setContentLength(1234L); + processor.persistLocalState(cacheKey, stateOrig); + + PutS3Object.MultipartState state1 = processor.getLocalStateIfInS3(mockClient, bucket, cacheKey); + Assert.assertEquals("1234", state1.getUploadId()); + Assert.assertEquals(1234L, state1.getContentLength().longValue()); + + processor.persistLocalState(cacheKey, null); + PutS3Object.MultipartState state2 = processor.getLocalStateIfInS3(mockClient, bucket, cacheKey); + Assert.assertNull(state2); + } + + @Test + public void testMultipartSmallerThanMinimum() throws IOException { + final String FILE1_NAME = "file1"; + + final byte[] megabyte = new byte[1024 * 1024]; + final Path tempFile = Files.createTempFile("s3mulitpart", "tmp"); + final FileOutputStream tempOut = new FileOutputStream(tempFile.toFile()); + long tempByteCount = 0; + for (int i = 0; i < 5; i++) { + tempOut.write(megabyte); + tempByteCount += megabyte.length; + } + tempOut.close(); + System.out.println("file size: " + tempByteCount); + Assert.assertTrue(tempByteCount < S3_MINIMUM_PART_SIZE); + + Assert.assertTrue(megabyte.length < S3_MINIMUM_PART_SIZE); + Assert.assertTrue(TEST_PARTSIZE_LONG >= S3_MINIMUM_PART_SIZE && TEST_PARTSIZE_LONG <= S3_MAXIMUM_OBJECT_SIZE); + + final PutS3Object processor = new PutS3Object(); + final TestRunner runner = TestRunners.newTestRunner(processor); + + runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE); + runner.setProperty(PutS3Object.REGION, REGION); + runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME); + runner.setProperty(PutS3Object.MULTIPART_PART_SIZE, TEST_PARTSIZE_STRING); + + Map attributes = new HashMap<>(); + attributes.put(CoreAttributes.FILENAME.key(), FILE1_NAME); + runner.enqueue(new FileInputStream(tempFile.toFile()), attributes); + + runner.assertValid(); + runner.run(); + runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS); + final List successFiles = runner.getFlowFilesForRelationship(PutS3Object.REL_SUCCESS); + Assert.assertEquals(1, successFiles.size()); + final List failureFiles = runner.getFlowFilesForRelationship(PutS3Object.REL_FAILURE); + Assert.assertEquals(0, failureFiles.size()); + MockFlowFile ff1 = successFiles.get(0); + Assert.assertEquals(PutS3Object.S3_API_METHOD_PUTOBJECT, ff1.getAttribute(PutS3Object.S3_API_METHOD_ATTR_KEY)); + Assert.assertEquals(FILE1_NAME, ff1.getAttribute(CoreAttributes.FILENAME.key())); + Assert.assertEquals(BUCKET_NAME, ff1.getAttribute(PutS3Object.S3_BUCKET_KEY)); + Assert.assertEquals(FILE1_NAME, ff1.getAttribute(PutS3Object.S3_OBJECT_KEY)); + Assert.assertTrue(reS3ETag.matcher(ff1.getAttribute(PutS3Object.S3_ETAG_ATTR_KEY)).matches()); + Assert.assertEquals(tempByteCount, ff1.getSize()); + } + + @Test + public void testMultipartBetweenMinimumAndMaximum() throws IOException { + final String FILE1_NAME = "file1"; + + final byte[] megabyte = new byte[1024 * 1024]; + final Path tempFile = Files.createTempFile("s3mulitpart", "tmp"); + final FileOutputStream tempOut = new FileOutputStream(tempFile.toFile()); + long tempByteCount = 0; + for ( ; tempByteCount < TEST_PARTSIZE_LONG + 1; ) { + tempOut.write(megabyte); + tempByteCount += megabyte.length; + } + tempOut.close(); + System.out.println("file size: " + tempByteCount); + Assert.assertTrue(tempByteCount > S3_MINIMUM_PART_SIZE && tempByteCount < S3_MAXIMUM_OBJECT_SIZE); + Assert.assertTrue(tempByteCount > TEST_PARTSIZE_LONG); + + final PutS3Object processor = new PutS3Object(); + final TestRunner runner = TestRunners.newTestRunner(processor); + + runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE); + runner.setProperty(PutS3Object.REGION, REGION); + runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME); + runner.setProperty(PutS3Object.MULTIPART_THRESHOLD, TEST_PARTSIZE_STRING); + runner.setProperty(PutS3Object.MULTIPART_PART_SIZE, TEST_PARTSIZE_STRING); + + Map attributes = new HashMap<>(); + attributes.put(CoreAttributes.FILENAME.key(), FILE1_NAME); + runner.enqueue(new FileInputStream(tempFile.toFile()), attributes); + + runner.assertValid(); + runner.run(); + runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS); + final List successFiles = runner.getFlowFilesForRelationship(PutS3Object.REL_SUCCESS); + Assert.assertEquals(1, successFiles.size()); + final List failureFiles = runner.getFlowFilesForRelationship(PutS3Object.REL_FAILURE); + Assert.assertEquals(0, failureFiles.size()); + MockFlowFile ff1 = successFiles.get(0); + Assert.assertEquals(PutS3Object.S3_API_METHOD_MULTIPARTUPLOAD, ff1.getAttribute(PutS3Object.S3_API_METHOD_ATTR_KEY)); + Assert.assertEquals(FILE1_NAME, ff1.getAttribute(CoreAttributes.FILENAME.key())); + Assert.assertEquals(BUCKET_NAME, ff1.getAttribute(PutS3Object.S3_BUCKET_KEY)); + Assert.assertEquals(FILE1_NAME, ff1.getAttribute(PutS3Object.S3_OBJECT_KEY)); + Assert.assertTrue(reS3ETag.matcher(ff1.getAttribute(PutS3Object.S3_ETAG_ATTR_KEY)).matches()); + Assert.assertEquals(tempByteCount, ff1.getSize()); + } + + @Ignore + @Test + public void testMultipartLargerThanObjectMaximum() throws IOException { + final String FILE1_NAME = "file1"; + + final byte[] megabyte = new byte[1024 * 1024]; + final Path tempFile = Files.createTempFile("s3mulitpart", "tmp"); + final FileOutputStream tempOut = new FileOutputStream(tempFile.toFile()); + for (int i = 0; i < (S3_MAXIMUM_OBJECT_SIZE / 1024 / 1024 + 1); i++) { + tempOut.write(megabyte); + } + tempOut.close(); + + final PutS3Object processor = new PutS3Object(); + final TestRunner runner = TestRunners.newTestRunner(processor); + + runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE); + runner.setProperty(PutS3Object.REGION, REGION); + runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME); + runner.setProperty(PutS3Object.MULTIPART_PART_SIZE, TEST_PARTSIZE_STRING); + + Map attributes = new HashMap<>(); + attributes.put(CoreAttributes.FILENAME.key(), FILE1_NAME); + runner.enqueue(new FileInputStream(tempFile.toFile()), attributes); + + runner.assertValid(); + runner.run(); + runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS); + final List successFiles = runner.getFlowFilesForRelationship(PutS3Object.REL_SUCCESS); + Assert.assertEquals(1, successFiles.size()); + final List failureFiles = runner.getFlowFilesForRelationship(PutS3Object.REL_FAILURE); + Assert.assertEquals(0, failureFiles.size()); + MockFlowFile ff1 = successFiles.get(0); + Assert.assertEquals(FILE1_NAME, ff1.getAttribute(CoreAttributes.FILENAME.key())); + Assert.assertEquals(BUCKET_NAME, ff1.getAttribute(PutS3Object.S3_BUCKET_KEY)); + Assert.assertEquals(FILE1_NAME, ff1.getAttribute(PutS3Object.S3_OBJECT_KEY)); + Assert.assertTrue(reS3ETag.matcher(ff1.getAttribute(PutS3Object.S3_ETAG_ATTR_KEY)).matches()); + Assert.assertTrue(ff1.getSize() > S3_MAXIMUM_OBJECT_SIZE); + } + + @Test + public void testS3MultipartAgeoff() throws InterruptedException, IOException { + final PutS3Object processor = new PutS3Object(); + final TestRunner runner = TestRunners.newTestRunner(processor); + final ProcessContext context = runner.getProcessContext(); + + runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE); + runner.setProperty(PutS3Object.REGION, REGION); + runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME); + + // set check interval and age off to minimum values + runner.setProperty(PutS3Object.MULTIPART_S3_AGEOFF_INTERVAL, "1 milli"); + runner.setProperty(PutS3Object.MULTIPART_S3_MAX_AGE, "1 milli"); + + // create some dummy uploads + for (Integer i = 0; i < 3; i++) { + final InitiateMultipartUploadRequest initiateRequest = new InitiateMultipartUploadRequest( + BUCKET_NAME, "file" + i.toString() + ".txt"); + try { + client.initiateMultipartUpload(initiateRequest); + } catch (AmazonClientException e) { + Assert.fail("Failed to initiate upload: " + e.getMessage()); + } + } + + // Age off is time dependent, so test has some timing constraints. This + // sleep() delays long enough to satisfy interval and age intervals. + Thread.sleep(2000L); + + // System millis are used for timing, but it is incrememtned on each + // call to circumvent what appears to be caching in the AWS library. + // The increments are 1000 millis because AWS returns upload + // initiation times in whole seconds. + Long now = System.currentTimeMillis(); + + MultipartUploadListing uploadList = processor.getS3AgeoffListAndAgeoffLocalState(context, client, now); + Assert.assertEquals(3, uploadList.getMultipartUploads().size()); + + MultipartUpload upload0 = uploadList.getMultipartUploads().get(0); + processor.abortS3MultipartUpload(client, BUCKET_NAME, upload0); + + uploadList = processor.getS3AgeoffListAndAgeoffLocalState(context, client, now+1000); + Assert.assertEquals(2, uploadList.getMultipartUploads().size()); + + final Map attrs = new HashMap<>(); + attrs.put("filename", "test-upload.txt"); + runner.enqueue(getResourcePath(SAMPLE_FILE_RESOURCE_NAME), attrs); + runner.run(); + + uploadList = processor.getS3AgeoffListAndAgeoffLocalState(context, client, now+2000); + Assert.assertEquals(0, uploadList.getMultipartUploads().size()); + } + + private class MockAmazonS3Client extends AmazonS3Client { + MultipartUploadListing listing; + public void setListing(MultipartUploadListing newlisting) { + listing = newlisting; + } + + @Override + public MultipartUploadListing listMultipartUploads(ListMultipartUploadsRequest listMultipartUploadsRequest) + throws AmazonClientException, AmazonServiceException { + return listing; + } + } + + public class TestablePutS3Object extends PutS3Object { + public AmazonS3Client testable_getClient() { + return this.getClient(); + } + } +}