NIFI-7827 - PutS3Object - configuration of the multipart temp dir (#4541)

This commit is contained in:
Pierre Villard 2020-09-29 15:15:09 +02:00 committed by GitHub
parent fa0a1df23f
commit a57d38c58d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 79 additions and 59 deletions

View File

@ -52,6 +52,7 @@ import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
@ -107,9 +108,9 @@ import com.amazonaws.services.s3.model.UploadPartResult;
"parts in a multipart upload must be at least 5MB in size, except for the last part. These limits " +
"establish the bounds for the Multipart Upload Threshold and Part Size properties.")
@DynamicProperty(name = "The name of a User-Defined Metadata field to add to the S3 Object",
value = "The value of a User-Defined Metadata field to add to the S3 Object",
description = "Allows user-defined metadata to be added to the S3 object as key/value pairs",
expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
value = "The value of a User-Defined Metadata field to add to the S3 Object",
description = "Allows user-defined metadata to be added to the S3 object as key/value pairs",
expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
@ReadsAttribute(attribute = "filename", description = "Uses the FlowFile's filename as the filename for the S3 object")
@WritesAttributes({
@WritesAttribute(attribute = "s3.bucket", description = "The S3 bucket where the Object was put in S3"),
@ -130,31 +131,30 @@ public class PutS3Object extends AbstractS3Processor {
public static final long MIN_S3_PART_SIZE = 50L * 1024L * 1024L;
public static final long MAX_S3_PUTOBJECT_SIZE = 5L * 1024L * 1024L * 1024L;
public static final String PERSISTENCE_ROOT = "conf/state/";
public static final String NO_SERVER_SIDE_ENCRYPTION = "None";
public static final String CONTENT_DISPOSITION_INLINE = "inline";
public static final String CONTENT_DISPOSITION_ATTACHMENT = "attachment";
public static final PropertyDescriptor EXPIRATION_RULE_ID = new PropertyDescriptor.Builder()
.name("Expiration Time Rule")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
.name("Expiration Time Rule")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor CONTENT_TYPE = new PropertyDescriptor.Builder()
.name("Content Type")
.displayName("Content Type")
.description("Sets the Content-Type HTTP header indicating the type of content stored in the associated " +
"object. The value of this header is a standard MIME type.\n" +
"AWS S3 Java client will attempt to determine the correct content type if one hasn't been set" +
" yet. Users are responsible for ensuring a suitable content type is set when uploading streams. If " +
"no content type is provided and cannot be determined by the filename, the default content type " +
"\"application/octet-stream\" will be used.")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
.name("Content Type")
.displayName("Content Type")
.description("Sets the Content-Type HTTP header indicating the type of content stored in the associated " +
"object. The value of this header is a standard MIME type.\n" +
"AWS S3 Java client will attempt to determine the correct content type if one hasn't been set" +
" yet. Users are responsible for ensuring a suitable content type is set when uploading streams. If " +
"no content type is provided and cannot be determined by the filename, the default content type " +
"\"application/octet-stream\" will be used.")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor CONTENT_DISPOSITION = new PropertyDescriptor.Builder()
.name("Content Disposition")
@ -176,12 +176,12 @@ public class PutS3Object extends AbstractS3Processor {
.build();
public static final PropertyDescriptor STORAGE_CLASS = new PropertyDescriptor.Builder()
.name("Storage Class")
.required(true)
.allowableValues(StorageClass.Standard.name(), StorageClass.IntelligentTiering.name(), StorageClass.StandardInfrequentAccess.name(),
StorageClass.OneZoneInfrequentAccess.name(), StorageClass.Glacier.name(), StorageClass.DeepArchive.name(), StorageClass.ReducedRedundancy.name())
.defaultValue(StorageClass.Standard.name())
.build();
.name("Storage Class")
.required(true)
.allowableValues(StorageClass.Standard.name(), StorageClass.IntelligentTiering.name(), StorageClass.StandardInfrequentAccess.name(),
StorageClass.OneZoneInfrequentAccess.name(), StorageClass.Glacier.name(), StorageClass.DeepArchive.name(), StorageClass.ReducedRedundancy.name())
.defaultValue(StorageClass.Standard.name())
.build();
public static final PropertyDescriptor MULTIPART_THRESHOLD = new PropertyDescriptor.Builder()
.name("Multipart Threshold")
@ -254,12 +254,23 @@ public class PutS3Object extends AbstractS3Processor {
.defaultValue("false")
.build();
public static final PropertyDescriptor MULTIPART_TEMP_DIR = new PropertyDescriptor.Builder()
.name("s3-temporary-directory-multipart")
.displayName("Temporary Directory Multipart State")
.description("Directory in which, for multipart uploads, the processor will locally save the state tracking the upload ID and parts "
+ "uploaded which must both be provided to complete the upload.")
.required(true)
.addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
.defaultValue("${java.io.tmpdir}")
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
Arrays.asList(KEY, BUCKET, CONTENT_TYPE, CONTENT_DISPOSITION, CACHE_CONTROL, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, OBJECT_TAGS_PREFIX, REMOVE_TAG_PREFIX,
STORAGE_CLASS, REGION, TIMEOUT, EXPIRATION_RULE_ID, FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER,
CANNED_ACL, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE, SIGNER_OVERRIDE, MULTIPART_THRESHOLD, MULTIPART_PART_SIZE, MULTIPART_S3_AGEOFF_INTERVAL,
MULTIPART_S3_MAX_AGE, SERVER_SIDE_ENCRYPTION, ENCRYPTION_SERVICE, USE_CHUNKED_ENCODING, USE_PATH_STYLE_ACCESS,
PROXY_CONFIGURATION_SERVICE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD));
Arrays.asList(KEY, BUCKET, CONTENT_TYPE, CONTENT_DISPOSITION, CACHE_CONTROL, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE,
OBJECT_TAGS_PREFIX, REMOVE_TAG_PREFIX, STORAGE_CLASS, REGION, TIMEOUT, EXPIRATION_RULE_ID, FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST,
READ_ACL_LIST, WRITE_ACL_LIST, OWNER, CANNED_ACL, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE, SIGNER_OVERRIDE, MULTIPART_THRESHOLD, MULTIPART_PART_SIZE,
MULTIPART_S3_AGEOFF_INTERVAL, MULTIPART_S3_MAX_AGE, MULTIPART_TEMP_DIR, SERVER_SIDE_ENCRYPTION, ENCRYPTION_SERVICE, USE_CHUNKED_ENCODING,
USE_PATH_STYLE_ACCESS, PROXY_CONFIGURATION_SERVICE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD));
final static String S3_BUCKET_KEY = "s3.bucket";
final static String S3_OBJECT_KEY = "s3.key";
@ -282,6 +293,13 @@ public class PutS3Object extends AbstractS3Processor {
final static String S3_PROCESS_UNSCHEDULED_MESSAGE = "Processor unscheduled, stopping upload";
private volatile String tempDirMultipart = System.getProperty("java.io.tmpdir");
@OnScheduled
public void setTempDir(final ProcessContext context) {
this.tempDirMultipart = context.getProperty(MULTIPART_TEMP_DIR).evaluateAttributeExpressions().getValue();
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
@ -290,15 +308,15 @@ public class PutS3Object extends AbstractS3Processor {
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
.name(propertyDescriptorName)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.dynamic(true)
.build();
.name(propertyDescriptorName)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.dynamic(true)
.build();
}
protected File getPersistenceFile() {
return new File(PERSISTENCE_ROOT + getIdentifier());
return new File(this.tempDirMultipart + File.pathSeparator + getIdentifier());
}
protected boolean localUploadExistsInS3(final AmazonS3Client s3, final String bucket, final MultipartState localState) {
@ -314,7 +332,7 @@ public class PutS3Object extends AbstractS3Processor {
}
protected synchronized MultipartState getLocalStateIfInS3(final AmazonS3Client s3, final String bucket,
final String s3ObjectKey) throws IOException {
final String s3ObjectKey) throws IOException {
MultipartState currState = getLocalState(s3ObjectKey);
if (currState == null) {
return null;
@ -498,9 +516,9 @@ public class PutS3Object extends AbstractS3Processor {
objectMetadata.setContentDisposition(CONTENT_DISPOSITION_INLINE);
attributes.put(S3_CONTENT_DISPOSITION, CONTENT_DISPOSITION_INLINE);
} else if (contentDisposition != null && contentDisposition.equals(CONTENT_DISPOSITION_ATTACHMENT)) {
String contentDispositionValue = CONTENT_DISPOSITION_ATTACHMENT + "; filename=\"" + fileName + "\"";
objectMetadata.setContentDisposition(contentDispositionValue);
attributes.put(S3_CONTENT_DISPOSITION, contentDispositionValue);
String contentDispositionValue = CONTENT_DISPOSITION_ATTACHMENT + "; filename=\"" + fileName + "\"";
objectMetadata.setContentDisposition(contentDispositionValue);
attributes.put(S3_CONTENT_DISPOSITION, contentDispositionValue);
} else {
objectMetadata.setContentDisposition(fileName);
}
@ -710,7 +728,7 @@ public class PutS3Object extends AbstractS3Processor {
long thisPartSize;
boolean isLastPart;
for (int part = currentState.getPartETags().size() + 1;
currentState.getFilePosition() < currentState.getContentLength(); part++) {
currentState.getFilePosition() < currentState.getContentLength(); part++) {
if (!PutS3Object.this.isScheduled()) {
throw new IOException(S3_PROCESS_UNSCHEDULED_MESSAGE + " flowfile=" + ffFilename +
" part=" + part + " uploadId=" + currentState.getUploadId());
@ -747,7 +765,7 @@ public class PutS3Object extends AbstractS3Processor {
}
getLogger().info("Success uploading part flowfile={} part={} available={} " +
"etag={} uploadId={}", new Object[]{ffFilename, part, available,
uploadPartResult.getETag(), currentState.getUploadId()});
uploadPartResult.getETag(), currentState.getUploadId()});
} catch (AmazonClientException e) {
getLogger().info("Failure uploading part flowfile={} part={} bucket={} key={} " +
"reason={}", new Object[]{ffFilename, part, bucket, key, e.getMessage()});
@ -904,16 +922,16 @@ public class PutS3Object extends AbstractS3Processor {
final Map<String, String> attributesMap = flowFile.getAttributes();
attributesMap.entrySet().stream().sequential()
.filter(attribute -> attribute.getKey().startsWith(prefix))
.forEach(attribute -> {
String tagKey = attribute.getKey();
String tagValue = attribute.getValue();
.filter(attribute -> attribute.getKey().startsWith(prefix))
.forEach(attribute -> {
String tagKey = attribute.getKey();
String tagValue = attribute.getValue();
if (context.getProperty(REMOVE_TAG_PREFIX).asBoolean()) {
tagKey = tagKey.replace(prefix, "");
}
objectTags.add(new Tag(tagKey, tagValue));
});
if (context.getProperty(REMOVE_TAG_PREFIX).asBoolean()) {
tagKey = tagKey.replace(prefix, "");
}
objectTags.add(new Tag(tagKey, tagValue));
});
return objectTags;
}
@ -1020,7 +1038,7 @@ public class PutS3Object extends AbstractS3Processor {
public String toString() {
StringBuilder buf = new StringBuilder();
buf.append(_uploadId).append(SEPARATOR)
.append(_filePosition.toString()).append(SEPARATOR);
.append(_filePosition.toString()).append(SEPARATOR);
if (_partETags.size() > 0) {
boolean first = true;
for (PartETag tag : _partETags) {
@ -1033,10 +1051,10 @@ public class PutS3Object extends AbstractS3Processor {
}
}
buf.append(SEPARATOR)
.append(_partSize.toString()).append(SEPARATOR)
.append(_storageClass.toString()).append(SEPARATOR)
.append(_contentLength.toString()).append(SEPARATOR)
.append(_timestamp.toString());
.append(_partSize.toString()).append(SEPARATOR)
.append(_storageClass.toString()).append(SEPARATOR)
.append(_contentLength.toString()).append(SEPARATOR)
.append(_timestamp.toString());
return buf.toString();
}
}

View File

@ -70,6 +70,7 @@ public class TestPutS3Object {
}
};
runner = TestRunners.newTestRunner(putS3Object);
runner.setVariable("java.io.tmpdir", "conf/state");
}
@Test
@ -210,7 +211,7 @@ public class TestPutS3Object {
public void testGetPropertyDescriptors() {
PutS3Object processor = new PutS3Object();
List<PropertyDescriptor> pd = processor.getSupportedPropertyDescriptors();
assertEquals("size should be eq", 38, pd.size());
assertEquals("size should be eq", 39, pd.size());
assertTrue(pd.contains(PutS3Object.ACCESS_KEY));
assertTrue(pd.contains(PutS3Object.AWS_CREDENTIALS_PROVIDER_SERVICE));
assertTrue(pd.contains(PutS3Object.BUCKET));
@ -249,5 +250,6 @@ public class TestPutS3Object {
assertTrue(pd.contains(PutS3Object.MULTIPART_PART_SIZE));
assertTrue(pd.contains(PutS3Object.MULTIPART_S3_AGEOFF_INTERVAL));
assertTrue(pd.contains(PutS3Object.MULTIPART_S3_MAX_AGE));
assertTrue(pd.contains(PutS3Object.MULTIPART_TEMP_DIR));
}
}