NIFI-12642 Added support for FileResourceService in PutS3Object

This closes #8295.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
Balázs Gerner 2024-01-22 13:11:18 +01:00 committed by Peter Turcsanyi
parent dff7ea3535
commit c1a21ad078
No known key found for this signature in database
GPG Key ID: 55A813F1C3E553DC
5 changed files with 476 additions and 341 deletions

View File

@ -38,6 +38,15 @@
<artifactId>nifi-listed-entity</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-resource-transfer</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-file-resource-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-aws-abstract-processors</artifactId>
@ -141,6 +150,12 @@
<version>2.0.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-file-resource-service</artifactId>
<version>2.0.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>

View File

@ -53,14 +53,15 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.fileresource.service.api.FileResource;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.transfer.ResourceTransferSource;
import java.io.File;
import java.io.FileInputStream;
@ -78,6 +79,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.TreeSet;
@ -87,6 +89,10 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import static org.apache.nifi.processors.transfer.ResourceTransferProperties.FILE_RESOURCE_SERVICE;
import static org.apache.nifi.processors.transfer.ResourceTransferProperties.RESOURCE_TRANSFER_SOURCE;
import static org.apache.nifi.processors.transfer.ResourceTransferUtils.getFileResource;
@SupportsBatching
@SeeAlso({FetchS3Object.class, DeleteS3Object.class, ListS3.class})
@InputRequirement(Requirement.INPUT_REQUIRED)
@ -261,6 +267,8 @@ public class PutS3Object extends AbstractS3Processor {
KEY,
S3_REGION,
AWS_CREDENTIALS_PROVIDER_SERVICE,
RESOURCE_TRANSFER_SOURCE,
FILE_RESOURCE_SERVICE,
STORAGE_CLASS,
ENCRYPTION_SERVICE,
SERVER_SIDE_ENCRYPTION,
@ -501,6 +509,8 @@ public class PutS3Object extends AbstractS3Processor {
final FlowFile ff = flowFile;
final Map<String, String> attributes = new HashMap<>();
final String ffFilename = ff.getAttributes().get(CoreAttributes.FILENAME.key());
final ResourceTransferSource resourceTransferSource = context.getProperty(RESOURCE_TRANSFER_SOURCE).asAllowableValue(ResourceTransferSource.class);
attributes.put(S3_BUCKET_KEY, bucket);
attributes.put(S3_OBJECT_KEY, key);
@ -519,329 +529,332 @@ public class PutS3Object extends AbstractS3Processor {
*/
try {
final FlowFile flowFileCopy = flowFile;
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(final InputStream in) throws IOException {
final ObjectMetadata objectMetadata = new ObjectMetadata();
objectMetadata.setContentLength(ff.getSize());
Optional<FileResource> optFileResource = getFileResource(resourceTransferSource, context, flowFile.getAttributes());
try (InputStream in = optFileResource
.map(FileResource::getInputStream)
.orElseGet(() -> session.read(flowFileCopy))) {
final ObjectMetadata objectMetadata = new ObjectMetadata();
objectMetadata.setContentLength(optFileResource.map(FileResource::getSize).orElseGet(ff::getSize));
final String contentType = context.getProperty(CONTENT_TYPE)
.evaluateAttributeExpressions(ff).getValue();
if (contentType != null) {
objectMetadata.setContentType(contentType);
attributes.put(S3_CONTENT_TYPE, contentType);
final String contentType = context.getProperty(CONTENT_TYPE)
.evaluateAttributeExpressions(ff).getValue();
if (contentType != null) {
objectMetadata.setContentType(contentType);
attributes.put(S3_CONTENT_TYPE, contentType);
}
final String cacheControl = context.getProperty(CACHE_CONTROL)
.evaluateAttributeExpressions(ff).getValue();
if (cacheControl != null) {
objectMetadata.setCacheControl(cacheControl);
attributes.put(S3_CACHE_CONTROL, cacheControl);
}
final String contentDisposition = context.getProperty(CONTENT_DISPOSITION).getValue();
String fileName = URLEncoder.encode(ff.getAttribute(CoreAttributes.FILENAME.key()), StandardCharsets.UTF_8);
if (contentDisposition != null && contentDisposition.equals(CONTENT_DISPOSITION_INLINE)) {
objectMetadata.setContentDisposition(CONTENT_DISPOSITION_INLINE);
attributes.put(S3_CONTENT_DISPOSITION, CONTENT_DISPOSITION_INLINE);
} else if (contentDisposition != null && contentDisposition.equals(CONTENT_DISPOSITION_ATTACHMENT)) {
String contentDispositionValue = CONTENT_DISPOSITION_ATTACHMENT + "; filename=\"" + fileName + "\"";
objectMetadata.setContentDisposition(contentDispositionValue);
attributes.put(S3_CONTENT_DISPOSITION, contentDispositionValue);
} else {
objectMetadata.setContentDisposition(fileName);
}
final String expirationRule = context.getProperty(EXPIRATION_RULE_ID)
.evaluateAttributeExpressions(ff).getValue();
if (expirationRule != null) {
objectMetadata.setExpirationTimeRuleId(expirationRule);
}
final Map<String, String> userMetadata = new HashMap<>();
for (final Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
if (entry.getKey().isDynamic()) {
final String value = context.getProperty(
entry.getKey()).evaluateAttributeExpressions(ff).getValue();
userMetadata.put(entry.getKey().getName(), value);
}
}
final String serverSideEncryption = context.getProperty(SERVER_SIDE_ENCRYPTION).getValue();
AmazonS3EncryptionService encryptionService = null;
if (!serverSideEncryption.equals(NO_SERVER_SIDE_ENCRYPTION)) {
objectMetadata.setSSEAlgorithm(serverSideEncryption);
attributes.put(S3_SSE_ALGORITHM, serverSideEncryption);
} else {
encryptionService = context.getProperty(ENCRYPTION_SERVICE).asControllerService(AmazonS3EncryptionService.class);
}
if (!userMetadata.isEmpty()) {
objectMetadata.setUserMetadata(userMetadata);
}
if (ff.getSize() <= multipartThreshold) {
//----------------------------------------
// single part upload
//----------------------------------------
final PutObjectRequest request = new PutObjectRequest(bucket, key, in, objectMetadata);
if (encryptionService != null) {
encryptionService.configurePutObjectRequest(request, objectMetadata);
attributes.put(S3_ENCRYPTION_STRATEGY, encryptionService.getStrategyName());
}
final String cacheControl = context.getProperty(CACHE_CONTROL)
.evaluateAttributeExpressions(ff).getValue();
if (cacheControl != null) {
objectMetadata.setCacheControl(cacheControl);
attributes.put(S3_CACHE_CONTROL, cacheControl);
request.setStorageClass(StorageClass.valueOf(context.getProperty(STORAGE_CLASS).getValue()));
final AccessControlList acl = createACL(context, ff);
if (acl != null) {
request.setAccessControlList(acl);
}
final String contentDisposition = context.getProperty(CONTENT_DISPOSITION).getValue();
String fileName = URLEncoder.encode(ff.getAttribute(CoreAttributes.FILENAME.key()), StandardCharsets.UTF_8);
if (contentDisposition != null && contentDisposition.equals(CONTENT_DISPOSITION_INLINE)) {
objectMetadata.setContentDisposition(CONTENT_DISPOSITION_INLINE);
attributes.put(S3_CONTENT_DISPOSITION, CONTENT_DISPOSITION_INLINE);
} else if (contentDisposition != null && contentDisposition.equals(CONTENT_DISPOSITION_ATTACHMENT)) {
String contentDispositionValue = CONTENT_DISPOSITION_ATTACHMENT + "; filename=\"" + fileName + "\"";
objectMetadata.setContentDisposition(contentDispositionValue);
attributes.put(S3_CONTENT_DISPOSITION, contentDispositionValue);
} else {
objectMetadata.setContentDisposition(fileName);
final CannedAccessControlList cannedAcl = createCannedACL(context, ff);
if (cannedAcl != null) {
request.withCannedAcl(cannedAcl);
}
final String expirationRule = context.getProperty(EXPIRATION_RULE_ID)
.evaluateAttributeExpressions(ff).getValue();
if (expirationRule != null) {
objectMetadata.setExpirationTimeRuleId(expirationRule);
if (context.getProperty(OBJECT_TAGS_PREFIX).isSet()) {
request.setTagging(new ObjectTagging(getObjectTags(context, flowFileCopy)));
}
final Map<String, String> userMetadata = new HashMap<>();
for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
if (entry.getKey().isDynamic()) {
final String value = context.getProperty(
entry.getKey()).evaluateAttributeExpressions(ff).getValue();
userMetadata.put(entry.getKey().getName(), value);
try {
final PutObjectResult result = s3.putObject(request);
if (result.getVersionId() != null) {
attributes.put(S3_VERSION_ATTR_KEY, result.getVersionId());
}
if (result.getETag() != null) {
attributes.put(S3_ETAG_ATTR_KEY, result.getETag());
}
if (result.getExpirationTime() != null) {
attributes.put(S3_EXPIRATION_ATTR_KEY, result.getExpirationTime().toString());
}
if (result.getMetadata().getStorageClass() != null) {
attributes.put(S3_STORAGECLASS_ATTR_KEY, result.getMetadata().getStorageClass());
} else {
attributes.put(S3_STORAGECLASS_ATTR_KEY, StorageClass.Standard.toString());
}
if (userMetadata.size() > 0) {
StringBuilder userMetaBldr = new StringBuilder();
for (String userKey : userMetadata.keySet()) {
userMetaBldr.append(userKey).append("=").append(userMetadata.get(userKey));
}
attributes.put(S3_USERMETA_ATTR_KEY, userMetaBldr.toString());
}
attributes.put(S3_API_METHOD_ATTR_KEY, S3_API_METHOD_PUTOBJECT);
} catch (AmazonClientException e) {
getLogger().info("Failure completing upload flowfile={} bucket={} key={} reason={}",
ffFilename, bucket, key, e.getMessage());
throw (e);
}
} else {
//----------------------------------------
// multipart upload
//----------------------------------------
// load or create persistent state
//------------------------------------------------------------
MultipartState currentState;
try {
currentState = getLocalStateIfInS3(s3, bucket, cacheKey);
if (currentState != null) {
if (currentState.getPartETags().size() > 0) {
final PartETag lastETag = currentState.getPartETags().get(
currentState.getPartETags().size() - 1);
getLogger().info("Resuming upload for flowfile='{}' bucket='{}' key='{}' " +
"uploadID='{}' filePosition='{}' partSize='{}' storageClass='{}' " +
"contentLength='{}' partsLoaded={} lastPart={}/{}",
ffFilename, bucket, key, currentState.getUploadId(),
currentState.getFilePosition(), currentState.getPartSize(),
currentState.getStorageClass().toString(),
currentState.getContentLength(),
currentState.getPartETags().size(),
Integer.toString(lastETag.getPartNumber()),
lastETag.getETag());
} else {
getLogger().info("Resuming upload for flowfile='{}' bucket='{}' key='{}' " +
"uploadID='{}' filePosition='{}' partSize='{}' storageClass='{}' " +
"contentLength='{}' no partsLoaded",
ffFilename, bucket, key, currentState.getUploadId(),
currentState.getFilePosition(), currentState.getPartSize(),
currentState.getStorageClass().toString(),
currentState.getContentLength());
}
} else {
currentState = new MultipartState();
currentState.setPartSize(multipartPartSize);
currentState.setStorageClass(
StorageClass.valueOf(context.getProperty(STORAGE_CLASS).getValue()));
currentState.setContentLength(ff.getSize());
persistLocalState(cacheKey, currentState);
getLogger().info("Starting new upload for flowfile='{}' bucket='{}' key='{}'",
ffFilename, bucket, key);
}
} catch (IOException e) {
getLogger().error("IOException initiating cache state while processing flow files: " +
e.getMessage());
throw (e);
}
final String serverSideEncryption = context.getProperty(SERVER_SIDE_ENCRYPTION).getValue();
AmazonS3EncryptionService encryptionService = null;
if (!serverSideEncryption.equals(NO_SERVER_SIDE_ENCRYPTION)) {
objectMetadata.setSSEAlgorithm(serverSideEncryption);
attributes.put(S3_SSE_ALGORITHM, serverSideEncryption);
} else {
encryptionService = context.getProperty(ENCRYPTION_SERVICE).asControllerService(AmazonS3EncryptionService.class);
}
if (!userMetadata.isEmpty()) {
objectMetadata.setUserMetadata(userMetadata);
}
if (ff.getSize() <= multipartThreshold) {
//----------------------------------------
// single part upload
//----------------------------------------
final PutObjectRequest request = new PutObjectRequest(bucket, key, in, objectMetadata);
// initiate multipart upload or find position in file
//------------------------------------------------------------
if (currentState.getUploadId().isEmpty()) {
final InitiateMultipartUploadRequest initiateRequest = new InitiateMultipartUploadRequest(bucket, key, objectMetadata);
if (encryptionService != null) {
encryptionService.configurePutObjectRequest(request, objectMetadata);
encryptionService.configureInitiateMultipartUploadRequest(initiateRequest, objectMetadata);
attributes.put(S3_ENCRYPTION_STRATEGY, encryptionService.getStrategyName());
}
initiateRequest.setStorageClass(currentState.getStorageClass());
request.setStorageClass(StorageClass.valueOf(context.getProperty(STORAGE_CLASS).getValue()));
final AccessControlList acl = createACL(context, ff);
if (acl != null) {
request.setAccessControlList(acl);
initiateRequest.setAccessControlList(acl);
}
final CannedAccessControlList cannedAcl = createCannedACL(context, ff);
if (cannedAcl != null) {
request.withCannedAcl(cannedAcl);
initiateRequest.withCannedACL(cannedAcl);
}
if (context.getProperty(OBJECT_TAGS_PREFIX).isSet()) {
request.setTagging(new ObjectTagging(getObjectTags(context, flowFileCopy)));
initiateRequest.setTagging(new ObjectTagging(getObjectTags(context, flowFileCopy)));
}
try {
final PutObjectResult result = s3.putObject(request);
if (result.getVersionId() != null) {
attributes.put(S3_VERSION_ATTR_KEY, result.getVersionId());
}
if (result.getETag() != null) {
attributes.put(S3_ETAG_ATTR_KEY, result.getETag());
}
if (result.getExpirationTime() != null) {
attributes.put(S3_EXPIRATION_ATTR_KEY, result.getExpirationTime().toString());
}
if (result.getMetadata().getStorageClass() != null) {
attributes.put(S3_STORAGECLASS_ATTR_KEY, result.getMetadata().getStorageClass());
} else {
attributes.put(S3_STORAGECLASS_ATTR_KEY, StorageClass.Standard.toString());
}
if (userMetadata.size() > 0) {
StringBuilder userMetaBldr = new StringBuilder();
for (String userKey : userMetadata.keySet()) {
userMetaBldr.append(userKey).append("=").append(userMetadata.get(userKey));
}
attributes.put(S3_USERMETA_ATTR_KEY, userMetaBldr.toString());
}
attributes.put(S3_API_METHOD_ATTR_KEY, S3_API_METHOD_PUTOBJECT);
} catch (AmazonClientException e) {
getLogger().info("Failure completing upload flowfile={} bucket={} key={} reason={}",
ffFilename, bucket, key, e.getMessage());
throw (e);
}
} else {
//----------------------------------------
// multipart upload
//----------------------------------------
// load or create persistent state
//------------------------------------------------------------
MultipartState currentState;
try {
currentState = getLocalStateIfInS3(s3, bucket, cacheKey);
if (currentState != null) {
if (currentState.getPartETags().size() > 0) {
final PartETag lastETag = currentState.getPartETags().get(
currentState.getPartETags().size() - 1);
getLogger().info("Resuming upload for flowfile='{}' bucket='{}' key='{}' " +
"uploadID='{}' filePosition='{}' partSize='{}' storageClass='{}' " +
"contentLength='{}' partsLoaded={} lastPart={}/{}",
ffFilename, bucket, key, currentState.getUploadId(),
currentState.getFilePosition(), currentState.getPartSize(),
currentState.getStorageClass().toString(),
currentState.getContentLength(),
currentState.getPartETags().size(),
Integer.toString(lastETag.getPartNumber()),
lastETag.getETag());
} else {
getLogger().info("Resuming upload for flowfile='{}' bucket='{}' key='{}' " +
"uploadID='{}' filePosition='{}' partSize='{}' storageClass='{}' " +
"contentLength='{}' no partsLoaded",
ffFilename, bucket, key, currentState.getUploadId(),
currentState.getFilePosition(), currentState.getPartSize(),
currentState.getStorageClass().toString(),
currentState.getContentLength());
}
} else {
currentState = new MultipartState();
currentState.setPartSize(multipartPartSize);
currentState.setStorageClass(
StorageClass.valueOf(context.getProperty(STORAGE_CLASS).getValue()));
currentState.setContentLength(ff.getSize());
final InitiateMultipartUploadResult initiateResult =
s3.initiateMultipartUpload(initiateRequest);
currentState.setUploadId(initiateResult.getUploadId());
currentState.getPartETags().clear();
try {
persistLocalState(cacheKey, currentState);
getLogger().info("Starting new upload for flowfile='{}' bucket='{}' key='{}'",
ffFilename, bucket, key);
} catch (Exception e) {
getLogger().info("Exception saving cache state while processing flow file: " +
e.getMessage());
throw (new ProcessException("Exception saving cache state", e));
}
} catch (IOException e) {
getLogger().error("IOException initiating cache state while processing flow files: " +
e.getMessage());
throw (e);
}
// initiate multipart upload or find position in file
//------------------------------------------------------------
if (currentState.getUploadId().isEmpty()) {
final InitiateMultipartUploadRequest initiateRequest = new InitiateMultipartUploadRequest(bucket, key, objectMetadata);
if (encryptionService != null) {
encryptionService.configureInitiateMultipartUploadRequest(initiateRequest, objectMetadata);
attributes.put(S3_ENCRYPTION_STRATEGY, encryptionService.getStrategyName());
getLogger().info("Success initiating upload flowfile={} available={} position={} " +
"length={} bucket={} key={} uploadId={}",
new Object[]{ffFilename, in.available(), currentState.getFilePosition(),
currentState.getContentLength(), bucket, key,
currentState.getUploadId()});
if (initiateResult.getUploadId() != null) {
attributes.put(S3_UPLOAD_ID_ATTR_KEY, initiateResult.getUploadId());
}
initiateRequest.setStorageClass(currentState.getStorageClass());
final AccessControlList acl = createACL(context, ff);
if (acl != null) {
initiateRequest.setAccessControlList(acl);
}
final CannedAccessControlList cannedAcl = createCannedACL(context, ff);
if (cannedAcl != null) {
initiateRequest.withCannedACL(cannedAcl);
}
if (context.getProperty(OBJECT_TAGS_PREFIX).isSet()) {
initiateRequest.setTagging(new ObjectTagging(getObjectTags(context, flowFileCopy)));
}
try {
final InitiateMultipartUploadResult initiateResult =
s3.initiateMultipartUpload(initiateRequest);
currentState.setUploadId(initiateResult.getUploadId());
currentState.getPartETags().clear();
try {
persistLocalState(cacheKey, currentState);
} catch (Exception e) {
getLogger().info("Exception saving cache state while processing flow file: " +
e.getMessage());
throw (new ProcessException("Exception saving cache state", e));
}
getLogger().info("Success initiating upload flowfile={} available={} position={} " +
"length={} bucket={} key={} uploadId={}",
new Object[]{ffFilename, in.available(), currentState.getFilePosition(),
currentState.getContentLength(), bucket, key,
currentState.getUploadId()});
if (initiateResult.getUploadId() != null) {
attributes.put(S3_UPLOAD_ID_ATTR_KEY, initiateResult.getUploadId());
}
} catch (AmazonClientException e) {
getLogger().info("Failure initiating upload flowfile={} bucket={} key={} reason={}",
new Object[]{ffFilename, bucket, key, e.getMessage()});
throw (e);
}
} else {
if (currentState.getFilePosition() > 0) {
try {
final long skipped = in.skip(currentState.getFilePosition());
if (skipped != currentState.getFilePosition()) {
getLogger().info("Failure skipping to resume upload flowfile={} " +
"bucket={} key={} position={} skipped={}",
new Object[]{ffFilename, bucket, key,
currentState.getFilePosition(), skipped});
}
} catch (Exception e) {
getLogger().info("Failure skipping to resume upload flowfile={} bucket={} " +
"key={} position={} reason={}",
new Object[]{ffFilename, bucket, key, currentState.getFilePosition(),
e.getMessage()});
throw (new ProcessException(e));
}
}
}
// upload parts
//------------------------------------------------------------
long thisPartSize;
boolean isLastPart;
for (int part = currentState.getPartETags().size() + 1;
currentState.getFilePosition() < currentState.getContentLength(); part++) {
if (!PutS3Object.this.isScheduled()) {
throw new IOException(S3_PROCESS_UNSCHEDULED_MESSAGE + " flowfile=" + ffFilename +
" part=" + part + " uploadId=" + currentState.getUploadId());
}
thisPartSize = Math.min(currentState.getPartSize(),
(currentState.getContentLength() - currentState.getFilePosition()));
isLastPart = currentState.getContentLength() == currentState.getFilePosition() + thisPartSize;
UploadPartRequest uploadRequest = new UploadPartRequest()
.withBucketName(bucket)
.withKey(key)
.withUploadId(currentState.getUploadId())
.withInputStream(in)
.withPartNumber(part)
.withPartSize(thisPartSize)
.withLastPart(isLastPart);
if (encryptionService != null) {
encryptionService.configureUploadPartRequest(uploadRequest, objectMetadata);
}
try {
UploadPartResult uploadPartResult = s3.uploadPart(uploadRequest);
currentState.addPartETag(uploadPartResult.getPartETag());
currentState.setFilePosition(currentState.getFilePosition() + thisPartSize);
try {
persistLocalState(cacheKey, currentState);
} catch (Exception e) {
getLogger().info("Exception saving cache state processing flow file: " +
e.getMessage());
}
int available = 0;
try {
available = in.available();
} catch (IOException e) {
// in case of the last part, the stream is already closed
}
getLogger().info("Success uploading part flowfile={} part={} available={} " +
"etag={} uploadId={}", new Object[]{ffFilename, part, available,
uploadPartResult.getETag(), currentState.getUploadId()});
} catch (AmazonClientException e) {
getLogger().info("Failure uploading part flowfile={} part={} bucket={} key={} " +
"reason={}", new Object[]{ffFilename, part, bucket, key, e.getMessage()});
throw (e);
}
}
// complete multipart upload
//------------------------------------------------------------
CompleteMultipartUploadRequest completeRequest = new CompleteMultipartUploadRequest(
bucket, key, currentState.getUploadId(), currentState.getPartETags());
// No call to an encryption service is needed for a CompleteMultipartUploadRequest.
try {
CompleteMultipartUploadResult completeResult =
s3.completeMultipartUpload(completeRequest);
getLogger().info("Success completing upload flowfile={} etag={} uploadId={}",
new Object[]{ffFilename, completeResult.getETag(), currentState.getUploadId()});
if (completeResult.getVersionId() != null) {
attributes.put(S3_VERSION_ATTR_KEY, completeResult.getVersionId());
}
if (completeResult.getETag() != null) {
attributes.put(S3_ETAG_ATTR_KEY, completeResult.getETag());
}
if (completeResult.getExpirationTime() != null) {
attributes.put(S3_EXPIRATION_ATTR_KEY,
completeResult.getExpirationTime().toString());
}
if (currentState.getStorageClass() != null) {
attributes.put(S3_STORAGECLASS_ATTR_KEY, currentState.getStorageClass().toString());
}
if (userMetadata.size() > 0) {
StringBuilder userMetaBldr = new StringBuilder();
for (String userKey : userMetadata.keySet()) {
userMetaBldr.append(userKey).append("=").append(userMetadata.get(userKey));
}
attributes.put(S3_USERMETA_ATTR_KEY, userMetaBldr.toString());
}
attributes.put(S3_API_METHOD_ATTR_KEY, S3_API_METHOD_MULTIPARTUPLOAD);
} catch (AmazonClientException e) {
getLogger().info("Failure completing upload flowfile={} bucket={} key={} reason={}",
getLogger().info("Failure initiating upload flowfile={} bucket={} key={} reason={}",
new Object[]{ffFilename, bucket, key, e.getMessage()});
throw (e);
}
} else {
if (currentState.getFilePosition() > 0) {
try {
final long skipped = in.skip(currentState.getFilePosition());
if (skipped != currentState.getFilePosition()) {
getLogger().info("Failure skipping to resume upload flowfile={} " +
"bucket={} key={} position={} skipped={}",
new Object[]{ffFilename, bucket, key,
currentState.getFilePosition(), skipped});
}
} catch (Exception e) {
getLogger().info("Failure skipping to resume upload flowfile={} bucket={} " +
"key={} position={} reason={}",
new Object[]{ffFilename, bucket, key, currentState.getFilePosition(),
e.getMessage()});
throw (new ProcessException(e));
}
}
}
// upload parts
//------------------------------------------------------------
long thisPartSize;
boolean isLastPart;
for (int part = currentState.getPartETags().size() + 1;
currentState.getFilePosition() < currentState.getContentLength(); part++) {
if (!PutS3Object.this.isScheduled()) {
throw new IOException(S3_PROCESS_UNSCHEDULED_MESSAGE + " flowfile=" + ffFilename +
" part=" + part + " uploadId=" + currentState.getUploadId());
}
thisPartSize = Math.min(currentState.getPartSize(),
(currentState.getContentLength() - currentState.getFilePosition()));
isLastPart = currentState.getContentLength() == currentState.getFilePosition() + thisPartSize;
UploadPartRequest uploadRequest = new UploadPartRequest()
.withBucketName(bucket)
.withKey(key)
.withUploadId(currentState.getUploadId())
.withInputStream(in)
.withPartNumber(part)
.withPartSize(thisPartSize)
.withLastPart(isLastPart);
if (encryptionService != null) {
encryptionService.configureUploadPartRequest(uploadRequest, objectMetadata);
}
try {
UploadPartResult uploadPartResult = s3.uploadPart(uploadRequest);
currentState.addPartETag(uploadPartResult.getPartETag());
currentState.setFilePosition(currentState.getFilePosition() + thisPartSize);
try {
persistLocalState(cacheKey, currentState);
} catch (Exception e) {
getLogger().info("Exception saving cache state processing flow file: " +
e.getMessage());
}
int available = 0;
try {
available = in.available();
} catch (IOException e) {
// in case of the last part, the stream is already closed
}
getLogger().info("Success uploading part flowfile={} part={} available={} " +
"etag={} uploadId={}", new Object[]{ffFilename, part, available,
uploadPartResult.getETag(), currentState.getUploadId()});
} catch (AmazonClientException e) {
getLogger().info("Failure uploading part flowfile={} part={} bucket={} key={} " +
"reason={}", new Object[]{ffFilename, part, bucket, key, e.getMessage()});
throw (e);
}
}
// complete multipart upload
//------------------------------------------------------------
CompleteMultipartUploadRequest completeRequest = new CompleteMultipartUploadRequest(
bucket, key, currentState.getUploadId(), currentState.getPartETags());
// No call to an encryption service is needed for a CompleteMultipartUploadRequest.
try {
CompleteMultipartUploadResult completeResult =
s3.completeMultipartUpload(completeRequest);
getLogger().info("Success completing upload flowfile={} etag={} uploadId={}",
new Object[]{ffFilename, completeResult.getETag(), currentState.getUploadId()});
if (completeResult.getVersionId() != null) {
attributes.put(S3_VERSION_ATTR_KEY, completeResult.getVersionId());
}
if (completeResult.getETag() != null) {
attributes.put(S3_ETAG_ATTR_KEY, completeResult.getETag());
}
if (completeResult.getExpirationTime() != null) {
attributes.put(S3_EXPIRATION_ATTR_KEY,
completeResult.getExpirationTime().toString());
}
if (currentState.getStorageClass() != null) {
attributes.put(S3_STORAGECLASS_ATTR_KEY, currentState.getStorageClass().toString());
}
if (userMetadata.size() > 0) {
StringBuilder userMetaBldr = new StringBuilder();
for (String userKey : userMetadata.keySet()) {
userMetaBldr.append(userKey).append("=").append(userMetadata.get(userKey));
}
attributes.put(S3_USERMETA_ATTR_KEY, userMetaBldr.toString());
}
attributes.put(S3_API_METHOD_ATTR_KEY, S3_API_METHOD_MULTIPARTUPLOAD);
} catch (AmazonClientException e) {
getLogger().info("Failure completing upload flowfile={} bucket={} key={} reason={}",
new Object[]{ffFilename, bucket, key, e.getMessage()});
throw (e);
}
}
});
} catch (IOException e) {
getLogger().error("Error during upload of flow files: " + e.getMessage());
throw e;
}
if (!attributes.isEmpty()) {
flowFile = session.putAllAttributes(flowFile, attributes);
@ -852,25 +865,25 @@ public class PutS3Object extends AbstractS3Processor {
final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
session.getProvenanceReporter().send(flowFile, url, millis);
getLogger().info("Successfully put {} to Amazon S3 in {} milliseconds", new Object[] {ff, millis});
getLogger().info("Successfully put {} to Amazon S3 in {} milliseconds", new Object[]{ff, millis});
try {
removeLocalState(cacheKey);
} catch (IOException e) {
getLogger().info("Error trying to delete key {} from cache: {}",
new Object[]{cacheKey, e.getMessage()});
}
} catch (final ProcessException | AmazonClientException pe) {
extractExceptionDetails(pe, session, flowFile);
if (pe.getMessage().contains(S3_PROCESS_UNSCHEDULED_MESSAGE)) {
getLogger().info(pe.getMessage());
} catch (final ProcessException | AmazonClientException | IOException e) {
extractExceptionDetails(e, session, flowFile);
if (e.getMessage().contains(S3_PROCESS_UNSCHEDULED_MESSAGE)) {
getLogger().info(e.getMessage());
session.rollback();
} else {
getLogger().error("Failed to put {} to Amazon S3 due to {}", new Object[]{flowFile, pe});
getLogger().error("Failed to put {} to Amazon S3 due to {}", flowFile, e);
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
}
}
}
private final Lock s3BucketLock = new ReentrantLock();

View File

@ -41,9 +41,12 @@ import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.localstack.LocalStackContainer;
import org.testcontainers.utility.DockerImageName;
@ -70,6 +73,8 @@ import static org.junit.jupiter.api.Assertions.fail;
* @see ITListS3
*/
public abstract class AbstractS3IT {
private static final Logger logger = LoggerFactory.getLogger(AbstractS3IT.class);
protected final static String SAMPLE_FILE_RESOURCE_NAME = "/hello.txt";
protected final static String BUCKET_NAME = "test-bucket-" + System.currentTimeMillis();
@ -82,7 +87,6 @@ public abstract class AbstractS3IT {
private static final LocalStackContainer localstack = new LocalStackContainer(localstackImage)
.withServices(LocalStackContainer.Service.S3, LocalStackContainer.Service.KMS);
@BeforeAll
public static void oneTimeSetup() {
localstack.start();
@ -101,6 +105,45 @@ public abstract class AbstractS3IT {
client.createBucket(request);
}
@BeforeEach
public void clearKeys() {
addedKeys.clear();
}
@AfterEach
public void emptyBucket() {
if (!client.doesBucketExistV2(BUCKET_NAME)) {
return;
}
ObjectListing objectListing = client.listObjects(BUCKET_NAME);
while (true) {
for (S3ObjectSummary objectSummary : objectListing.getObjectSummaries()) {
client.deleteObject(BUCKET_NAME, objectSummary.getKey());
}
if (objectListing.isTruncated()) {
objectListing = client.listNextBatchOfObjects(objectListing);
} else {
break;
}
}
}
@AfterAll
public static void oneTimeTearDown() {
try {
if (client == null || !client.doesBucketExistV2(BUCKET_NAME)) {
return;
}
DeleteBucketRequest dbr = new DeleteBucketRequest(BUCKET_NAME);
client.deleteBucket(dbr);
} catch (final AmazonS3Exception e) {
logger.error("Unable to delete bucket {}", BUCKET_NAME, e);
}
}
protected AmazonS3 getClient() {
return client;
}
@ -121,44 +164,6 @@ public abstract class AbstractS3IT {
AuthUtils.enableAccessKey(runner, localstack.getAccessKey(), localstack.getSecretKey());
}
@BeforeEach
public void clearKeys() {
addedKeys.clear();
}
@AfterAll
public static void oneTimeTearDown() {
// Empty the bucket before deleting it.
try {
if (client == null) {
return;
}
ObjectListing objectListing = client.listObjects(BUCKET_NAME);
while (true) {
for (S3ObjectSummary objectSummary : objectListing.getObjectSummaries()) {
client.deleteObject(BUCKET_NAME, objectSummary.getKey());
}
if (objectListing.isTruncated()) {
objectListing = client.listNextBatchOfObjects(objectListing);
} else {
break;
}
}
DeleteBucketRequest dbr = new DeleteBucketRequest(BUCKET_NAME);
client.deleteBucket(dbr);
} catch (final AmazonS3Exception e) {
System.err.println("Unable to delete bucket " + BUCKET_NAME + e.toString());
}
if (client.doesBucketExistV2(BUCKET_NAME)) {
fail("Incomplete teardown, subsequent tests might fail");
}
}
protected void putTestFile(String key, File file) throws AmazonS3Exception {
PutObjectRequest putRequest = new PutObjectRequest(BUCKET_NAME, key, file);
client.putObject(putRequest);

View File

@ -24,13 +24,17 @@ import com.amazonaws.services.s3.model.MultipartUpload;
import com.amazonaws.services.s3.model.MultipartUploadListing;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PartETag;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.amazonaws.services.s3.model.StorageClass;
import com.amazonaws.services.s3.model.Tag;
import org.apache.commons.codec.binary.Base64;
import org.apache.nifi.fileresource.service.StandardFileResourceService;
import org.apache.nifi.fileresource.service.api.FileResourceService;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.aws.s3.encryption.StandardS3EncryptionService;
import org.apache.nifi.processors.transfer.ResourceTransferSource;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.reporting.InitializationException;
@ -53,6 +57,12 @@ import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import static org.apache.nifi.processors.transfer.ResourceTransferProperties.FILE_RESOURCE_SERVICE;
import static org.apache.nifi.processors.transfer.ResourceTransferProperties.RESOURCE_TRANSFER_SOURCE;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasSize;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -88,6 +98,61 @@ public class ITPutS3Object extends AbstractS3IT {
kmsKeyId = getKMSKey();
}
@Test
public void testPutFromLocalFile() throws Exception {
TestRunner runner = initTestRunner();
String attributeName = "file.path";
Path resourcePath = getResourcePath(SAMPLE_FILE_RESOURCE_NAME);
String serviceId = FileResourceService.class.getSimpleName();
FileResourceService service = new StandardFileResourceService();
runner.addControllerService(serviceId, service);
runner.setProperty(service, StandardFileResourceService.FILE_PATH, String.format("${%s}", attributeName));
runner.enableControllerService(service);
runner.setProperty(RESOURCE_TRANSFER_SOURCE, ResourceTransferSource.FILE_RESOURCE_SERVICE.getValue());
runner.setProperty(FILE_RESOURCE_SERVICE, serviceId);
Map<String, String> attributes = new HashMap<>();
attributes.put(attributeName, resourcePath.toString());
runner.enqueue(resourcePath, attributes);
runner.run();
runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutS3Object.REL_SUCCESS).getFirst();
flowFile.assertContentEquals(getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME));
List<S3ObjectSummary> objectSummaries = getClient().listObjects(BUCKET_NAME).getObjectSummaries();
assertThat(objectSummaries, hasSize(1));
assertEquals(objectSummaries.getFirst().getKey(), resourcePath.getFileName().toString());
assertThat(objectSummaries.getFirst().getSize(), greaterThan(0L));
}
@Test
public void testPutFromNonExistentLocalFile() throws Exception {
TestRunner runner = initTestRunner();
String attributeName = "file.path";
String serviceId = FileResourceService.class.getSimpleName();
FileResourceService service = new StandardFileResourceService();
runner.addControllerService(serviceId, service);
runner.setProperty(service, StandardFileResourceService.FILE_PATH, String.format("${%s}", attributeName));
runner.enableControllerService(service);
runner.setProperty(RESOURCE_TRANSFER_SOURCE, ResourceTransferSource.FILE_RESOURCE_SERVICE);
runner.setProperty(FILE_RESOURCE_SERVICE, serviceId);
String filePath = "nonexistent.txt";
Map<String, String> attributes = new HashMap<>();
attributes.put(attributeName, filePath);
runner.enqueue("", attributes);
runner.run();
runner.assertAllFlowFilesTransferred(PutS3Object.REL_FAILURE, 1);
assertThat(getClient().listObjects(BUCKET_NAME).getObjectSummaries(), empty());
}
@Test
public void testSimplePut() throws IOException {

View File

@ -37,10 +37,13 @@ import com.amazonaws.services.s3.model.StorageClass;
import com.amazonaws.services.s3.model.Tag;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.fileresource.service.api.FileResource;
import org.apache.nifi.fileresource.service.api.FileResourceService;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.aws.signer.AwsSignerType;
import org.apache.nifi.processors.aws.testutil.AuthUtils;
import org.apache.nifi.processors.transfer.ResourceTransferSource;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@ -50,6 +53,7 @@ import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import java.io.File;
import java.io.InputStream;
import java.net.URLEncoder;
import java.util.Date;
import java.util.HashMap;
@ -57,11 +61,16 @@ import java.util.List;
import java.util.Map;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.nifi.processors.transfer.ResourceTransferProperties.FILE_RESOURCE_SERVICE;
import static org.apache.nifi.processors.transfer.ResourceTransferProperties.RESOURCE_TRANSFER_SOURCE;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class TestPutS3Object {
@ -87,6 +96,33 @@ public class TestPutS3Object {
runner.setEnvironmentVariableValue("java.io.tmpdir", System.getProperty("java.io.tmpdir"));
}
@Test
public void testPutSinglePartFromLocalFileSource() throws Exception {
prepareTest();
String serviceId = "fileresource";
FileResourceService service = mock(FileResourceService.class);
InputStream localFileInputStream = mock(InputStream.class);
when(service.getIdentifier()).thenReturn(serviceId);
long contentLength = 10L;
when(service.getFileResource(anyMap())).thenReturn(new FileResource(localFileInputStream, contentLength));
runner.addControllerService(serviceId, service);
runner.enableControllerService(service);
runner.setProperty(RESOURCE_TRANSFER_SOURCE, ResourceTransferSource.FILE_RESOURCE_SERVICE.getValue());
runner.setProperty(FILE_RESOURCE_SERVICE, serviceId);
runner.run();
ArgumentCaptor<PutObjectRequest> captureRequest = ArgumentCaptor.forClass(PutObjectRequest.class);
verify(mockS3Client).putObject(captureRequest.capture());
PutObjectRequest putObjectRequest = captureRequest.getValue();
assertEquals(localFileInputStream, putObjectRequest.getInputStream());
assertEquals(putObjectRequest.getMetadata().getContentLength(), contentLength);
runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 1);
}
@Test
public void testPutSinglePart() {
runner.setProperty("x-custom-prop", "hello");
@ -95,7 +131,7 @@ public class TestPutS3Object {
runner.run(1);
ArgumentCaptor<PutObjectRequest> captureRequest = ArgumentCaptor.forClass(PutObjectRequest.class);
Mockito.verify(mockS3Client, Mockito.times(1)).putObject(captureRequest.capture());
verify(mockS3Client, Mockito.times(1)).putObject(captureRequest.capture());
PutObjectRequest request = captureRequest.getValue();
assertEquals("test-bucket", request.getBucketName());
@ -105,6 +141,7 @@ public class TestPutS3Object {
MockFlowFile ff0 = flowFiles.get(0);
ff0.assertAttributeEquals(CoreAttributes.FILENAME.key(), "testfile.txt");
ff0.assertContentEquals("Test Content");
ff0.assertAttributeEquals(PutS3Object.S3_ETAG_ATTR_KEY, "test-etag");
ff0.assertAttributeEquals(PutS3Object.S3_VERSION_ATTR_KEY, "test-version");
}
@ -113,7 +150,7 @@ public class TestPutS3Object {
public void testPutSinglePartException() {
prepareTest();
Mockito.when(mockS3Client.putObject(Mockito.any(PutObjectRequest.class))).thenThrow(new AmazonS3Exception("TestFail"));
when(mockS3Client.putObject(Mockito.any(PutObjectRequest.class))).thenThrow(new AmazonS3Exception("TestFail"));
runner.run(1);
@ -150,7 +187,7 @@ public class TestPutS3Object {
runner.run(1);
ArgumentCaptor<PutObjectRequest> captureRequest = ArgumentCaptor.forClass(PutObjectRequest.class);
Mockito.verify(mockS3Client, Mockito.times(1)).putObject(captureRequest.capture());
verify(mockS3Client, Mockito.times(1)).putObject(captureRequest.capture());
PutObjectRequest request = captureRequest.getValue();
List<Tag> tagSet = request.getTagging().getTagSet();
@ -169,7 +206,7 @@ public class TestPutS3Object {
runner.run(1);
ArgumentCaptor<PutObjectRequest> captureRequest = ArgumentCaptor.forClass(PutObjectRequest.class);
Mockito.verify(mockS3Client, Mockito.times(1)).putObject(captureRequest.capture());
verify(mockS3Client, Mockito.times(1)).putObject(captureRequest.capture());
PutObjectRequest request = captureRequest.getValue();
assertEquals(storageClass.toString(), request.getStorageClass());
@ -185,7 +222,7 @@ public class TestPutS3Object {
runner.run(1);
ArgumentCaptor<PutObjectRequest> captureRequest = ArgumentCaptor.forClass(PutObjectRequest.class);
Mockito.verify(mockS3Client, Mockito.times(1)).putObject(captureRequest.capture());
verify(mockS3Client, Mockito.times(1)).putObject(captureRequest.capture());
PutObjectRequest request = captureRequest.getValue();
ObjectMetadata objectMetadata = request.getMetadata();
@ -241,10 +278,10 @@ public class TestPutS3Object {
putObjectResult.setVersionId("test-version");
putObjectResult.setETag("test-etag");
Mockito.when(mockS3Client.putObject(Mockito.any(PutObjectRequest.class))).thenReturn(putObjectResult);
when(mockS3Client.putObject(Mockito.any(PutObjectRequest.class))).thenReturn(putObjectResult);
MultipartUploadListing uploadListing = new MultipartUploadListing();
Mockito.when(mockS3Client.listMultipartUploads(Mockito.any(ListMultipartUploadsRequest.class))).thenReturn(uploadListing);
when(mockS3Client.listMultipartUploads(Mockito.any(ListMultipartUploadsRequest.class))).thenReturn(uploadListing);
}
@Test