diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml
index c0748af598..44ac6da9f3 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml
@@ -38,6 +38,15 @@
nifi-listed-entity
2.0.0-SNAPSHOT
+
+ org.apache.nifi
+ nifi-resource-transfer
+ 2.0.0-SNAPSHOT
+
+
+ org.apache.nifi
+ nifi-file-resource-service-api
+
org.apache.nifi
nifi-aws-abstract-processors
@@ -141,6 +150,12 @@
2.0.0-SNAPSHOT
test
+
+ org.apache.nifi
+ nifi-file-resource-service
+ 2.0.0-SNAPSHOT
+ test
+
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
index 3247f9a6a8..26c759417a 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
@@ -53,14 +53,15 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.fileresource.service.api.FileResource;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.transfer.ResourceTransferSource;
import java.io.File;
import java.io.FileInputStream;
@@ -78,6 +79,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.TreeSet;
@@ -87,6 +89,10 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
+import static org.apache.nifi.processors.transfer.ResourceTransferProperties.FILE_RESOURCE_SERVICE;
+import static org.apache.nifi.processors.transfer.ResourceTransferProperties.RESOURCE_TRANSFER_SOURCE;
+import static org.apache.nifi.processors.transfer.ResourceTransferUtils.getFileResource;
+
@SupportsBatching
@SeeAlso({FetchS3Object.class, DeleteS3Object.class, ListS3.class})
@InputRequirement(Requirement.INPUT_REQUIRED)
@@ -261,6 +267,8 @@ public class PutS3Object extends AbstractS3Processor {
KEY,
S3_REGION,
AWS_CREDENTIALS_PROVIDER_SERVICE,
+ RESOURCE_TRANSFER_SOURCE,
+ FILE_RESOURCE_SERVICE,
STORAGE_CLASS,
ENCRYPTION_SERVICE,
SERVER_SIDE_ENCRYPTION,
@@ -501,6 +509,8 @@ public class PutS3Object extends AbstractS3Processor {
final FlowFile ff = flowFile;
final Map attributes = new HashMap<>();
final String ffFilename = ff.getAttributes().get(CoreAttributes.FILENAME.key());
+ final ResourceTransferSource resourceTransferSource = context.getProperty(RESOURCE_TRANSFER_SOURCE).asAllowableValue(ResourceTransferSource.class);
+
attributes.put(S3_BUCKET_KEY, bucket);
attributes.put(S3_OBJECT_KEY, key);
@@ -519,329 +529,332 @@ public class PutS3Object extends AbstractS3Processor {
*/
try {
final FlowFile flowFileCopy = flowFile;
- session.read(flowFile, new InputStreamCallback() {
- @Override
- public void process(final InputStream in) throws IOException {
- final ObjectMetadata objectMetadata = new ObjectMetadata();
- objectMetadata.setContentLength(ff.getSize());
+ Optional optFileResource = getFileResource(resourceTransferSource, context, flowFile.getAttributes());
+ try (InputStream in = optFileResource
+ .map(FileResource::getInputStream)
+ .orElseGet(() -> session.read(flowFileCopy))) {
+ final ObjectMetadata objectMetadata = new ObjectMetadata();
+ objectMetadata.setContentLength(optFileResource.map(FileResource::getSize).orElseGet(ff::getSize));
- final String contentType = context.getProperty(CONTENT_TYPE)
- .evaluateAttributeExpressions(ff).getValue();
- if (contentType != null) {
- objectMetadata.setContentType(contentType);
- attributes.put(S3_CONTENT_TYPE, contentType);
+ final String contentType = context.getProperty(CONTENT_TYPE)
+ .evaluateAttributeExpressions(ff).getValue();
+ if (contentType != null) {
+ objectMetadata.setContentType(contentType);
+ attributes.put(S3_CONTENT_TYPE, contentType);
+ }
+
+ final String cacheControl = context.getProperty(CACHE_CONTROL)
+ .evaluateAttributeExpressions(ff).getValue();
+ if (cacheControl != null) {
+ objectMetadata.setCacheControl(cacheControl);
+ attributes.put(S3_CACHE_CONTROL, cacheControl);
+ }
+
+ final String contentDisposition = context.getProperty(CONTENT_DISPOSITION).getValue();
+ String fileName = URLEncoder.encode(ff.getAttribute(CoreAttributes.FILENAME.key()), StandardCharsets.UTF_8);
+ if (contentDisposition != null && contentDisposition.equals(CONTENT_DISPOSITION_INLINE)) {
+ objectMetadata.setContentDisposition(CONTENT_DISPOSITION_INLINE);
+ attributes.put(S3_CONTENT_DISPOSITION, CONTENT_DISPOSITION_INLINE);
+ } else if (contentDisposition != null && contentDisposition.equals(CONTENT_DISPOSITION_ATTACHMENT)) {
+ String contentDispositionValue = CONTENT_DISPOSITION_ATTACHMENT + "; filename=\"" + fileName + "\"";
+ objectMetadata.setContentDisposition(contentDispositionValue);
+ attributes.put(S3_CONTENT_DISPOSITION, contentDispositionValue);
+ } else {
+ objectMetadata.setContentDisposition(fileName);
+ }
+
+ final String expirationRule = context.getProperty(EXPIRATION_RULE_ID)
+ .evaluateAttributeExpressions(ff).getValue();
+ if (expirationRule != null) {
+ objectMetadata.setExpirationTimeRuleId(expirationRule);
+ }
+
+ final Map userMetadata = new HashMap<>();
+ for (final Entry entry : context.getProperties().entrySet()) {
+ if (entry.getKey().isDynamic()) {
+ final String value = context.getProperty(
+ entry.getKey()).evaluateAttributeExpressions(ff).getValue();
+ userMetadata.put(entry.getKey().getName(), value);
+ }
+ }
+
+ final String serverSideEncryption = context.getProperty(SERVER_SIDE_ENCRYPTION).getValue();
+ AmazonS3EncryptionService encryptionService = null;
+
+ if (!serverSideEncryption.equals(NO_SERVER_SIDE_ENCRYPTION)) {
+ objectMetadata.setSSEAlgorithm(serverSideEncryption);
+ attributes.put(S3_SSE_ALGORITHM, serverSideEncryption);
+ } else {
+ encryptionService = context.getProperty(ENCRYPTION_SERVICE).asControllerService(AmazonS3EncryptionService.class);
+ }
+
+ if (!userMetadata.isEmpty()) {
+ objectMetadata.setUserMetadata(userMetadata);
+ }
+
+ if (ff.getSize() <= multipartThreshold) {
+ //----------------------------------------
+ // single part upload
+ //----------------------------------------
+ final PutObjectRequest request = new PutObjectRequest(bucket, key, in, objectMetadata);
+ if (encryptionService != null) {
+ encryptionService.configurePutObjectRequest(request, objectMetadata);
+ attributes.put(S3_ENCRYPTION_STRATEGY, encryptionService.getStrategyName());
}
- final String cacheControl = context.getProperty(CACHE_CONTROL)
- .evaluateAttributeExpressions(ff).getValue();
- if (cacheControl != null) {
- objectMetadata.setCacheControl(cacheControl);
- attributes.put(S3_CACHE_CONTROL, cacheControl);
+ request.setStorageClass(StorageClass.valueOf(context.getProperty(STORAGE_CLASS).getValue()));
+ final AccessControlList acl = createACL(context, ff);
+ if (acl != null) {
+ request.setAccessControlList(acl);
}
- final String contentDisposition = context.getProperty(CONTENT_DISPOSITION).getValue();
- String fileName = URLEncoder.encode(ff.getAttribute(CoreAttributes.FILENAME.key()), StandardCharsets.UTF_8);
- if (contentDisposition != null && contentDisposition.equals(CONTENT_DISPOSITION_INLINE)) {
- objectMetadata.setContentDisposition(CONTENT_DISPOSITION_INLINE);
- attributes.put(S3_CONTENT_DISPOSITION, CONTENT_DISPOSITION_INLINE);
- } else if (contentDisposition != null && contentDisposition.equals(CONTENT_DISPOSITION_ATTACHMENT)) {
- String contentDispositionValue = CONTENT_DISPOSITION_ATTACHMENT + "; filename=\"" + fileName + "\"";
- objectMetadata.setContentDisposition(contentDispositionValue);
- attributes.put(S3_CONTENT_DISPOSITION, contentDispositionValue);
- } else {
- objectMetadata.setContentDisposition(fileName);
+ final CannedAccessControlList cannedAcl = createCannedACL(context, ff);
+ if (cannedAcl != null) {
+ request.withCannedAcl(cannedAcl);
}
- final String expirationRule = context.getProperty(EXPIRATION_RULE_ID)
- .evaluateAttributeExpressions(ff).getValue();
- if (expirationRule != null) {
- objectMetadata.setExpirationTimeRuleId(expirationRule);
+ if (context.getProperty(OBJECT_TAGS_PREFIX).isSet()) {
+ request.setTagging(new ObjectTagging(getObjectTags(context, flowFileCopy)));
}
- final Map userMetadata = new HashMap<>();
- for (final Map.Entry entry : context.getProperties().entrySet()) {
- if (entry.getKey().isDynamic()) {
- final String value = context.getProperty(
- entry.getKey()).evaluateAttributeExpressions(ff).getValue();
- userMetadata.put(entry.getKey().getName(), value);
+ try {
+ final PutObjectResult result = s3.putObject(request);
+ if (result.getVersionId() != null) {
+ attributes.put(S3_VERSION_ATTR_KEY, result.getVersionId());
}
+ if (result.getETag() != null) {
+ attributes.put(S3_ETAG_ATTR_KEY, result.getETag());
+ }
+ if (result.getExpirationTime() != null) {
+ attributes.put(S3_EXPIRATION_ATTR_KEY, result.getExpirationTime().toString());
+ }
+ if (result.getMetadata().getStorageClass() != null) {
+ attributes.put(S3_STORAGECLASS_ATTR_KEY, result.getMetadata().getStorageClass());
+ } else {
+ attributes.put(S3_STORAGECLASS_ATTR_KEY, StorageClass.Standard.toString());
+ }
+ if (userMetadata.size() > 0) {
+ StringBuilder userMetaBldr = new StringBuilder();
+ for (String userKey : userMetadata.keySet()) {
+ userMetaBldr.append(userKey).append("=").append(userMetadata.get(userKey));
+ }
+ attributes.put(S3_USERMETA_ATTR_KEY, userMetaBldr.toString());
+ }
+ attributes.put(S3_API_METHOD_ATTR_KEY, S3_API_METHOD_PUTOBJECT);
+ } catch (AmazonClientException e) {
+ getLogger().info("Failure completing upload flowfile={} bucket={} key={} reason={}",
+ ffFilename, bucket, key, e.getMessage());
+ throw (e);
+ }
+ } else {
+ //----------------------------------------
+ // multipart upload
+ //----------------------------------------
+
+ // load or create persistent state
+ //------------------------------------------------------------
+ MultipartState currentState;
+ try {
+ currentState = getLocalStateIfInS3(s3, bucket, cacheKey);
+ if (currentState != null) {
+ if (currentState.getPartETags().size() > 0) {
+ final PartETag lastETag = currentState.getPartETags().get(
+ currentState.getPartETags().size() - 1);
+ getLogger().info("Resuming upload for flowfile='{}' bucket='{}' key='{}' " +
+ "uploadID='{}' filePosition='{}' partSize='{}' storageClass='{}' " +
+ "contentLength='{}' partsLoaded={} lastPart={}/{}",
+ ffFilename, bucket, key, currentState.getUploadId(),
+ currentState.getFilePosition(), currentState.getPartSize(),
+ currentState.getStorageClass().toString(),
+ currentState.getContentLength(),
+ currentState.getPartETags().size(),
+ Integer.toString(lastETag.getPartNumber()),
+ lastETag.getETag());
+ } else {
+ getLogger().info("Resuming upload for flowfile='{}' bucket='{}' key='{}' " +
+ "uploadID='{}' filePosition='{}' partSize='{}' storageClass='{}' " +
+ "contentLength='{}' no partsLoaded",
+ ffFilename, bucket, key, currentState.getUploadId(),
+ currentState.getFilePosition(), currentState.getPartSize(),
+ currentState.getStorageClass().toString(),
+ currentState.getContentLength());
+ }
+ } else {
+ currentState = new MultipartState();
+ currentState.setPartSize(multipartPartSize);
+ currentState.setStorageClass(
+ StorageClass.valueOf(context.getProperty(STORAGE_CLASS).getValue()));
+ currentState.setContentLength(ff.getSize());
+ persistLocalState(cacheKey, currentState);
+ getLogger().info("Starting new upload for flowfile='{}' bucket='{}' key='{}'",
+ ffFilename, bucket, key);
+ }
+ } catch (IOException e) {
+ getLogger().error("IOException initiating cache state while processing flow files: " +
+ e.getMessage());
+ throw (e);
}
- final String serverSideEncryption = context.getProperty(SERVER_SIDE_ENCRYPTION).getValue();
- AmazonS3EncryptionService encryptionService = null;
-
- if (!serverSideEncryption.equals(NO_SERVER_SIDE_ENCRYPTION)) {
- objectMetadata.setSSEAlgorithm(serverSideEncryption);
- attributes.put(S3_SSE_ALGORITHM, serverSideEncryption);
- } else {
- encryptionService = context.getProperty(ENCRYPTION_SERVICE).asControllerService(AmazonS3EncryptionService.class);
- }
-
- if (!userMetadata.isEmpty()) {
- objectMetadata.setUserMetadata(userMetadata);
- }
-
- if (ff.getSize() <= multipartThreshold) {
- //----------------------------------------
- // single part upload
- //----------------------------------------
- final PutObjectRequest request = new PutObjectRequest(bucket, key, in, objectMetadata);
+ // initiate multipart upload or find position in file
+ //------------------------------------------------------------
+ if (currentState.getUploadId().isEmpty()) {
+ final InitiateMultipartUploadRequest initiateRequest = new InitiateMultipartUploadRequest(bucket, key, objectMetadata);
if (encryptionService != null) {
- encryptionService.configurePutObjectRequest(request, objectMetadata);
+ encryptionService.configureInitiateMultipartUploadRequest(initiateRequest, objectMetadata);
attributes.put(S3_ENCRYPTION_STRATEGY, encryptionService.getStrategyName());
}
+ initiateRequest.setStorageClass(currentState.getStorageClass());
- request.setStorageClass(StorageClass.valueOf(context.getProperty(STORAGE_CLASS).getValue()));
final AccessControlList acl = createACL(context, ff);
if (acl != null) {
- request.setAccessControlList(acl);
+ initiateRequest.setAccessControlList(acl);
}
-
final CannedAccessControlList cannedAcl = createCannedACL(context, ff);
if (cannedAcl != null) {
- request.withCannedAcl(cannedAcl);
+ initiateRequest.withCannedACL(cannedAcl);
}
if (context.getProperty(OBJECT_TAGS_PREFIX).isSet()) {
- request.setTagging(new ObjectTagging(getObjectTags(context, flowFileCopy)));
+ initiateRequest.setTagging(new ObjectTagging(getObjectTags(context, flowFileCopy)));
}
try {
- final PutObjectResult result = s3.putObject(request);
- if (result.getVersionId() != null) {
- attributes.put(S3_VERSION_ATTR_KEY, result.getVersionId());
- }
- if (result.getETag() != null) {
- attributes.put(S3_ETAG_ATTR_KEY, result.getETag());
- }
- if (result.getExpirationTime() != null) {
- attributes.put(S3_EXPIRATION_ATTR_KEY, result.getExpirationTime().toString());
- }
- if (result.getMetadata().getStorageClass() != null) {
- attributes.put(S3_STORAGECLASS_ATTR_KEY, result.getMetadata().getStorageClass());
- } else {
- attributes.put(S3_STORAGECLASS_ATTR_KEY, StorageClass.Standard.toString());
- }
- if (userMetadata.size() > 0) {
- StringBuilder userMetaBldr = new StringBuilder();
- for (String userKey : userMetadata.keySet()) {
- userMetaBldr.append(userKey).append("=").append(userMetadata.get(userKey));
- }
- attributes.put(S3_USERMETA_ATTR_KEY, userMetaBldr.toString());
- }
- attributes.put(S3_API_METHOD_ATTR_KEY, S3_API_METHOD_PUTOBJECT);
- } catch (AmazonClientException e) {
- getLogger().info("Failure completing upload flowfile={} bucket={} key={} reason={}",
- ffFilename, bucket, key, e.getMessage());
- throw (e);
- }
- } else {
- //----------------------------------------
- // multipart upload
- //----------------------------------------
-
- // load or create persistent state
- //------------------------------------------------------------
- MultipartState currentState;
- try {
- currentState = getLocalStateIfInS3(s3, bucket, cacheKey);
- if (currentState != null) {
- if (currentState.getPartETags().size() > 0) {
- final PartETag lastETag = currentState.getPartETags().get(
- currentState.getPartETags().size() - 1);
- getLogger().info("Resuming upload for flowfile='{}' bucket='{}' key='{}' " +
- "uploadID='{}' filePosition='{}' partSize='{}' storageClass='{}' " +
- "contentLength='{}' partsLoaded={} lastPart={}/{}",
- ffFilename, bucket, key, currentState.getUploadId(),
- currentState.getFilePosition(), currentState.getPartSize(),
- currentState.getStorageClass().toString(),
- currentState.getContentLength(),
- currentState.getPartETags().size(),
- Integer.toString(lastETag.getPartNumber()),
- lastETag.getETag());
- } else {
- getLogger().info("Resuming upload for flowfile='{}' bucket='{}' key='{}' " +
- "uploadID='{}' filePosition='{}' partSize='{}' storageClass='{}' " +
- "contentLength='{}' no partsLoaded",
- ffFilename, bucket, key, currentState.getUploadId(),
- currentState.getFilePosition(), currentState.getPartSize(),
- currentState.getStorageClass().toString(),
- currentState.getContentLength());
- }
- } else {
- currentState = new MultipartState();
- currentState.setPartSize(multipartPartSize);
- currentState.setStorageClass(
- StorageClass.valueOf(context.getProperty(STORAGE_CLASS).getValue()));
- currentState.setContentLength(ff.getSize());
+ final InitiateMultipartUploadResult initiateResult =
+ s3.initiateMultipartUpload(initiateRequest);
+ currentState.setUploadId(initiateResult.getUploadId());
+ currentState.getPartETags().clear();
+ try {
persistLocalState(cacheKey, currentState);
- getLogger().info("Starting new upload for flowfile='{}' bucket='{}' key='{}'",
- ffFilename, bucket, key);
+ } catch (Exception e) {
+ getLogger().info("Exception saving cache state while processing flow file: " +
+ e.getMessage());
+ throw (new ProcessException("Exception saving cache state", e));
}
- } catch (IOException e) {
- getLogger().error("IOException initiating cache state while processing flow files: " +
- e.getMessage());
- throw (e);
- }
-
- // initiate multipart upload or find position in file
- //------------------------------------------------------------
- if (currentState.getUploadId().isEmpty()) {
- final InitiateMultipartUploadRequest initiateRequest = new InitiateMultipartUploadRequest(bucket, key, objectMetadata);
- if (encryptionService != null) {
- encryptionService.configureInitiateMultipartUploadRequest(initiateRequest, objectMetadata);
- attributes.put(S3_ENCRYPTION_STRATEGY, encryptionService.getStrategyName());
+ getLogger().info("Success initiating upload flowfile={} available={} position={} " +
+ "length={} bucket={} key={} uploadId={}",
+ new Object[]{ffFilename, in.available(), currentState.getFilePosition(),
+ currentState.getContentLength(), bucket, key,
+ currentState.getUploadId()});
+ if (initiateResult.getUploadId() != null) {
+ attributes.put(S3_UPLOAD_ID_ATTR_KEY, initiateResult.getUploadId());
}
- initiateRequest.setStorageClass(currentState.getStorageClass());
-
- final AccessControlList acl = createACL(context, ff);
- if (acl != null) {
- initiateRequest.setAccessControlList(acl);
- }
- final CannedAccessControlList cannedAcl = createCannedACL(context, ff);
- if (cannedAcl != null) {
- initiateRequest.withCannedACL(cannedAcl);
- }
-
- if (context.getProperty(OBJECT_TAGS_PREFIX).isSet()) {
- initiateRequest.setTagging(new ObjectTagging(getObjectTags(context, flowFileCopy)));
- }
-
- try {
- final InitiateMultipartUploadResult initiateResult =
- s3.initiateMultipartUpload(initiateRequest);
- currentState.setUploadId(initiateResult.getUploadId());
- currentState.getPartETags().clear();
- try {
- persistLocalState(cacheKey, currentState);
- } catch (Exception e) {
- getLogger().info("Exception saving cache state while processing flow file: " +
- e.getMessage());
- throw (new ProcessException("Exception saving cache state", e));
- }
- getLogger().info("Success initiating upload flowfile={} available={} position={} " +
- "length={} bucket={} key={} uploadId={}",
- new Object[]{ffFilename, in.available(), currentState.getFilePosition(),
- currentState.getContentLength(), bucket, key,
- currentState.getUploadId()});
- if (initiateResult.getUploadId() != null) {
- attributes.put(S3_UPLOAD_ID_ATTR_KEY, initiateResult.getUploadId());
- }
- } catch (AmazonClientException e) {
- getLogger().info("Failure initiating upload flowfile={} bucket={} key={} reason={}",
- new Object[]{ffFilename, bucket, key, e.getMessage()});
- throw (e);
- }
- } else {
- if (currentState.getFilePosition() > 0) {
- try {
- final long skipped = in.skip(currentState.getFilePosition());
- if (skipped != currentState.getFilePosition()) {
- getLogger().info("Failure skipping to resume upload flowfile={} " +
- "bucket={} key={} position={} skipped={}",
- new Object[]{ffFilename, bucket, key,
- currentState.getFilePosition(), skipped});
- }
- } catch (Exception e) {
- getLogger().info("Failure skipping to resume upload flowfile={} bucket={} " +
- "key={} position={} reason={}",
- new Object[]{ffFilename, bucket, key, currentState.getFilePosition(),
- e.getMessage()});
- throw (new ProcessException(e));
- }
- }
- }
-
- // upload parts
- //------------------------------------------------------------
- long thisPartSize;
- boolean isLastPart;
- for (int part = currentState.getPartETags().size() + 1;
- currentState.getFilePosition() < currentState.getContentLength(); part++) {
- if (!PutS3Object.this.isScheduled()) {
- throw new IOException(S3_PROCESS_UNSCHEDULED_MESSAGE + " flowfile=" + ffFilename +
- " part=" + part + " uploadId=" + currentState.getUploadId());
- }
- thisPartSize = Math.min(currentState.getPartSize(),
- (currentState.getContentLength() - currentState.getFilePosition()));
- isLastPart = currentState.getContentLength() == currentState.getFilePosition() + thisPartSize;
- UploadPartRequest uploadRequest = new UploadPartRequest()
- .withBucketName(bucket)
- .withKey(key)
- .withUploadId(currentState.getUploadId())
- .withInputStream(in)
- .withPartNumber(part)
- .withPartSize(thisPartSize)
- .withLastPart(isLastPart);
- if (encryptionService != null) {
- encryptionService.configureUploadPartRequest(uploadRequest, objectMetadata);
- }
- try {
- UploadPartResult uploadPartResult = s3.uploadPart(uploadRequest);
- currentState.addPartETag(uploadPartResult.getPartETag());
- currentState.setFilePosition(currentState.getFilePosition() + thisPartSize);
- try {
- persistLocalState(cacheKey, currentState);
- } catch (Exception e) {
- getLogger().info("Exception saving cache state processing flow file: " +
- e.getMessage());
- }
- int available = 0;
- try {
- available = in.available();
- } catch (IOException e) {
- // in case of the last part, the stream is already closed
- }
- getLogger().info("Success uploading part flowfile={} part={} available={} " +
- "etag={} uploadId={}", new Object[]{ffFilename, part, available,
- uploadPartResult.getETag(), currentState.getUploadId()});
- } catch (AmazonClientException e) {
- getLogger().info("Failure uploading part flowfile={} part={} bucket={} key={} " +
- "reason={}", new Object[]{ffFilename, part, bucket, key, e.getMessage()});
- throw (e);
- }
- }
-
- // complete multipart upload
- //------------------------------------------------------------
- CompleteMultipartUploadRequest completeRequest = new CompleteMultipartUploadRequest(
- bucket, key, currentState.getUploadId(), currentState.getPartETags());
-
- // No call to an encryption service is needed for a CompleteMultipartUploadRequest.
- try {
- CompleteMultipartUploadResult completeResult =
- s3.completeMultipartUpload(completeRequest);
- getLogger().info("Success completing upload flowfile={} etag={} uploadId={}",
- new Object[]{ffFilename, completeResult.getETag(), currentState.getUploadId()});
- if (completeResult.getVersionId() != null) {
- attributes.put(S3_VERSION_ATTR_KEY, completeResult.getVersionId());
- }
- if (completeResult.getETag() != null) {
- attributes.put(S3_ETAG_ATTR_KEY, completeResult.getETag());
- }
- if (completeResult.getExpirationTime() != null) {
- attributes.put(S3_EXPIRATION_ATTR_KEY,
- completeResult.getExpirationTime().toString());
- }
- if (currentState.getStorageClass() != null) {
- attributes.put(S3_STORAGECLASS_ATTR_KEY, currentState.getStorageClass().toString());
- }
- if (userMetadata.size() > 0) {
- StringBuilder userMetaBldr = new StringBuilder();
- for (String userKey : userMetadata.keySet()) {
- userMetaBldr.append(userKey).append("=").append(userMetadata.get(userKey));
- }
- attributes.put(S3_USERMETA_ATTR_KEY, userMetaBldr.toString());
- }
- attributes.put(S3_API_METHOD_ATTR_KEY, S3_API_METHOD_MULTIPARTUPLOAD);
} catch (AmazonClientException e) {
- getLogger().info("Failure completing upload flowfile={} bucket={} key={} reason={}",
+ getLogger().info("Failure initiating upload flowfile={} bucket={} key={} reason={}",
new Object[]{ffFilename, bucket, key, e.getMessage()});
throw (e);
}
+ } else {
+ if (currentState.getFilePosition() > 0) {
+ try {
+ final long skipped = in.skip(currentState.getFilePosition());
+ if (skipped != currentState.getFilePosition()) {
+ getLogger().info("Failure skipping to resume upload flowfile={} " +
+ "bucket={} key={} position={} skipped={}",
+ new Object[]{ffFilename, bucket, key,
+ currentState.getFilePosition(), skipped});
+ }
+ } catch (Exception e) {
+ getLogger().info("Failure skipping to resume upload flowfile={} bucket={} " +
+ "key={} position={} reason={}",
+ new Object[]{ffFilename, bucket, key, currentState.getFilePosition(),
+ e.getMessage()});
+ throw (new ProcessException(e));
+ }
+ }
+ }
+
+ // upload parts
+ //------------------------------------------------------------
+ long thisPartSize;
+ boolean isLastPart;
+ for (int part = currentState.getPartETags().size() + 1;
+ currentState.getFilePosition() < currentState.getContentLength(); part++) {
+ if (!PutS3Object.this.isScheduled()) {
+ throw new IOException(S3_PROCESS_UNSCHEDULED_MESSAGE + " flowfile=" + ffFilename +
+ " part=" + part + " uploadId=" + currentState.getUploadId());
+ }
+ thisPartSize = Math.min(currentState.getPartSize(),
+ (currentState.getContentLength() - currentState.getFilePosition()));
+ isLastPart = currentState.getContentLength() == currentState.getFilePosition() + thisPartSize;
+ UploadPartRequest uploadRequest = new UploadPartRequest()
+ .withBucketName(bucket)
+ .withKey(key)
+ .withUploadId(currentState.getUploadId())
+ .withInputStream(in)
+ .withPartNumber(part)
+ .withPartSize(thisPartSize)
+ .withLastPart(isLastPart);
+ if (encryptionService != null) {
+ encryptionService.configureUploadPartRequest(uploadRequest, objectMetadata);
+ }
+ try {
+ UploadPartResult uploadPartResult = s3.uploadPart(uploadRequest);
+ currentState.addPartETag(uploadPartResult.getPartETag());
+ currentState.setFilePosition(currentState.getFilePosition() + thisPartSize);
+ try {
+ persistLocalState(cacheKey, currentState);
+ } catch (Exception e) {
+ getLogger().info("Exception saving cache state processing flow file: " +
+ e.getMessage());
+ }
+ int available = 0;
+ try {
+ available = in.available();
+ } catch (IOException e) {
+ // in case of the last part, the stream is already closed
+ }
+ getLogger().info("Success uploading part flowfile={} part={} available={} " +
+ "etag={} uploadId={}", new Object[]{ffFilename, part, available,
+ uploadPartResult.getETag(), currentState.getUploadId()});
+ } catch (AmazonClientException e) {
+ getLogger().info("Failure uploading part flowfile={} part={} bucket={} key={} " +
+ "reason={}", new Object[]{ffFilename, part, bucket, key, e.getMessage()});
+ throw (e);
+ }
+ }
+
+ // complete multipart upload
+ //------------------------------------------------------------
+ CompleteMultipartUploadRequest completeRequest = new CompleteMultipartUploadRequest(
+ bucket, key, currentState.getUploadId(), currentState.getPartETags());
+
+ // No call to an encryption service is needed for a CompleteMultipartUploadRequest.
+ try {
+ CompleteMultipartUploadResult completeResult =
+ s3.completeMultipartUpload(completeRequest);
+ getLogger().info("Success completing upload flowfile={} etag={} uploadId={}",
+ new Object[]{ffFilename, completeResult.getETag(), currentState.getUploadId()});
+ if (completeResult.getVersionId() != null) {
+ attributes.put(S3_VERSION_ATTR_KEY, completeResult.getVersionId());
+ }
+ if (completeResult.getETag() != null) {
+ attributes.put(S3_ETAG_ATTR_KEY, completeResult.getETag());
+ }
+ if (completeResult.getExpirationTime() != null) {
+ attributes.put(S3_EXPIRATION_ATTR_KEY,
+ completeResult.getExpirationTime().toString());
+ }
+ if (currentState.getStorageClass() != null) {
+ attributes.put(S3_STORAGECLASS_ATTR_KEY, currentState.getStorageClass().toString());
+ }
+ if (userMetadata.size() > 0) {
+ StringBuilder userMetaBldr = new StringBuilder();
+ for (String userKey : userMetadata.keySet()) {
+ userMetaBldr.append(userKey).append("=").append(userMetadata.get(userKey));
+ }
+ attributes.put(S3_USERMETA_ATTR_KEY, userMetaBldr.toString());
+ }
+ attributes.put(S3_API_METHOD_ATTR_KEY, S3_API_METHOD_MULTIPARTUPLOAD);
+ } catch (AmazonClientException e) {
+ getLogger().info("Failure completing upload flowfile={} bucket={} key={} reason={}",
+ new Object[]{ffFilename, bucket, key, e.getMessage()});
+ throw (e);
}
}
- });
+ } catch (IOException e) {
+ getLogger().error("Error during upload of flow files: " + e.getMessage());
+ throw e;
+ }
if (!attributes.isEmpty()) {
flowFile = session.putAllAttributes(flowFile, attributes);
@@ -852,25 +865,25 @@ public class PutS3Object extends AbstractS3Processor {
final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
session.getProvenanceReporter().send(flowFile, url, millis);
- getLogger().info("Successfully put {} to Amazon S3 in {} milliseconds", new Object[] {ff, millis});
+ getLogger().info("Successfully put {} to Amazon S3 in {} milliseconds", new Object[]{ff, millis});
try {
removeLocalState(cacheKey);
} catch (IOException e) {
getLogger().info("Error trying to delete key {} from cache: {}",
new Object[]{cacheKey, e.getMessage()});
}
- } catch (final ProcessException | AmazonClientException pe) {
- extractExceptionDetails(pe, session, flowFile);
- if (pe.getMessage().contains(S3_PROCESS_UNSCHEDULED_MESSAGE)) {
- getLogger().info(pe.getMessage());
+
+ } catch (final ProcessException | AmazonClientException | IOException e) {
+ extractExceptionDetails(e, session, flowFile);
+ if (e.getMessage().contains(S3_PROCESS_UNSCHEDULED_MESSAGE)) {
+ getLogger().info(e.getMessage());
session.rollback();
} else {
- getLogger().error("Failed to put {} to Amazon S3 due to {}", new Object[]{flowFile, pe});
+ getLogger().error("Failed to put {} to Amazon S3 due to {}", flowFile, e);
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
}
}
-
}
private final Lock s3BucketLock = new ReentrantLock();
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java
index 80b3950716..f8eb6dd5b2 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java
@@ -41,9 +41,12 @@ import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.testcontainers.containers.localstack.LocalStackContainer;
import org.testcontainers.utility.DockerImageName;
@@ -70,6 +73,8 @@ import static org.junit.jupiter.api.Assertions.fail;
* @see ITListS3
*/
public abstract class AbstractS3IT {
+ private static final Logger logger = LoggerFactory.getLogger(AbstractS3IT.class);
+
protected final static String SAMPLE_FILE_RESOURCE_NAME = "/hello.txt";
protected final static String BUCKET_NAME = "test-bucket-" + System.currentTimeMillis();
@@ -82,7 +87,6 @@ public abstract class AbstractS3IT {
private static final LocalStackContainer localstack = new LocalStackContainer(localstackImage)
.withServices(LocalStackContainer.Service.S3, LocalStackContainer.Service.KMS);
-
@BeforeAll
public static void oneTimeSetup() {
localstack.start();
@@ -101,6 +105,45 @@ public abstract class AbstractS3IT {
client.createBucket(request);
}
+ @BeforeEach
+ public void clearKeys() {
+ addedKeys.clear();
+ }
+
+ @AfterEach
+ public void emptyBucket() {
+ if (!client.doesBucketExistV2(BUCKET_NAME)) {
+ return;
+ }
+
+ ObjectListing objectListing = client.listObjects(BUCKET_NAME);
+ while (true) {
+ for (S3ObjectSummary objectSummary : objectListing.getObjectSummaries()) {
+ client.deleteObject(BUCKET_NAME, objectSummary.getKey());
+ }
+
+ if (objectListing.isTruncated()) {
+ objectListing = client.listNextBatchOfObjects(objectListing);
+ } else {
+ break;
+ }
+ }
+ }
+
+ @AfterAll
+ public static void oneTimeTearDown() {
+ try {
+ if (client == null || !client.doesBucketExistV2(BUCKET_NAME)) {
+ return;
+ }
+
+ DeleteBucketRequest dbr = new DeleteBucketRequest(BUCKET_NAME);
+ client.deleteBucket(dbr);
+ } catch (final AmazonS3Exception e) {
+ logger.error("Unable to delete bucket {}", BUCKET_NAME, e);
+ }
+ }
+
protected AmazonS3 getClient() {
return client;
}
@@ -121,44 +164,6 @@ public abstract class AbstractS3IT {
AuthUtils.enableAccessKey(runner, localstack.getAccessKey(), localstack.getSecretKey());
}
- @BeforeEach
- public void clearKeys() {
- addedKeys.clear();
- }
-
- @AfterAll
- public static void oneTimeTearDown() {
- // Empty the bucket before deleting it.
- try {
- if (client == null) {
- return;
- }
-
- ObjectListing objectListing = client.listObjects(BUCKET_NAME);
-
- while (true) {
- for (S3ObjectSummary objectSummary : objectListing.getObjectSummaries()) {
- client.deleteObject(BUCKET_NAME, objectSummary.getKey());
- }
-
- if (objectListing.isTruncated()) {
- objectListing = client.listNextBatchOfObjects(objectListing);
- } else {
- break;
- }
- }
-
- DeleteBucketRequest dbr = new DeleteBucketRequest(BUCKET_NAME);
- client.deleteBucket(dbr);
- } catch (final AmazonS3Exception e) {
- System.err.println("Unable to delete bucket " + BUCKET_NAME + e.toString());
- }
-
- if (client.doesBucketExistV2(BUCKET_NAME)) {
- fail("Incomplete teardown, subsequent tests might fail");
- }
- }
-
protected void putTestFile(String key, File file) throws AmazonS3Exception {
PutObjectRequest putRequest = new PutObjectRequest(BUCKET_NAME, key, file);
client.putObject(putRequest);
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java
index cda5e528db..43f936c9f0 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java
@@ -24,13 +24,17 @@ import com.amazonaws.services.s3.model.MultipartUpload;
import com.amazonaws.services.s3.model.MultipartUploadListing;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PartETag;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.amazonaws.services.s3.model.StorageClass;
import com.amazonaws.services.s3.model.Tag;
import org.apache.commons.codec.binary.Base64;
+import org.apache.nifi.fileresource.service.StandardFileResourceService;
+import org.apache.nifi.fileresource.service.api.FileResourceService;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.aws.s3.encryption.StandardS3EncryptionService;
+import org.apache.nifi.processors.transfer.ResourceTransferSource;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.reporting.InitializationException;
@@ -53,6 +57,12 @@ import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
+import static org.apache.nifi.processors.transfer.ResourceTransferProperties.FILE_RESOURCE_SERVICE;
+import static org.apache.nifi.processors.transfer.ResourceTransferProperties.RESOURCE_TRANSFER_SOURCE;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.hasSize;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -88,6 +98,61 @@ public class ITPutS3Object extends AbstractS3IT {
kmsKeyId = getKMSKey();
}
+ @Test
+ public void testPutFromLocalFile() throws Exception {
+ TestRunner runner = initTestRunner();
+ String attributeName = "file.path";
+ Path resourcePath = getResourcePath(SAMPLE_FILE_RESOURCE_NAME);
+
+ String serviceId = FileResourceService.class.getSimpleName();
+ FileResourceService service = new StandardFileResourceService();
+ runner.addControllerService(serviceId, service);
+ runner.setProperty(service, StandardFileResourceService.FILE_PATH, String.format("${%s}", attributeName));
+ runner.enableControllerService(service);
+
+ runner.setProperty(RESOURCE_TRANSFER_SOURCE, ResourceTransferSource.FILE_RESOURCE_SERVICE.getValue());
+ runner.setProperty(FILE_RESOURCE_SERVICE, serviceId);
+
+ Map attributes = new HashMap<>();
+ attributes.put(attributeName, resourcePath.toString());
+ runner.enqueue(resourcePath, attributes);
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 1);
+ MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutS3Object.REL_SUCCESS).getFirst();
+ flowFile.assertContentEquals(getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME));
+
+ List objectSummaries = getClient().listObjects(BUCKET_NAME).getObjectSummaries();
+ assertThat(objectSummaries, hasSize(1));
+ assertEquals(objectSummaries.getFirst().getKey(), resourcePath.getFileName().toString());
+ assertThat(objectSummaries.getFirst().getSize(), greaterThan(0L));
+ }
+
+ @Test
+ public void testPutFromNonExistentLocalFile() throws Exception {
+ TestRunner runner = initTestRunner();
+ String attributeName = "file.path";
+
+ String serviceId = FileResourceService.class.getSimpleName();
+ FileResourceService service = new StandardFileResourceService();
+ runner.addControllerService(serviceId, service);
+ runner.setProperty(service, StandardFileResourceService.FILE_PATH, String.format("${%s}", attributeName));
+ runner.enableControllerService(service);
+
+ runner.setProperty(RESOURCE_TRANSFER_SOURCE, ResourceTransferSource.FILE_RESOURCE_SERVICE);
+ runner.setProperty(FILE_RESOURCE_SERVICE, serviceId);
+
+ String filePath = "nonexistent.txt";
+
+ Map attributes = new HashMap<>();
+ attributes.put(attributeName, filePath);
+
+ runner.enqueue("", attributes);
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(PutS3Object.REL_FAILURE, 1);
+ assertThat(getClient().listObjects(BUCKET_NAME).getObjectSummaries(), empty());
+ }
@Test
public void testSimplePut() throws IOException {
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java
index fff03d7510..ab7bf59fb4 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java
@@ -37,10 +37,13 @@ import com.amazonaws.services.s3.model.StorageClass;
import com.amazonaws.services.s3.model.Tag;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.fileresource.service.api.FileResource;
+import org.apache.nifi.fileresource.service.api.FileResourceService;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.aws.signer.AwsSignerType;
import org.apache.nifi.processors.aws.testutil.AuthUtils;
+import org.apache.nifi.processors.transfer.ResourceTransferSource;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@@ -50,6 +53,7 @@ import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import java.io.File;
+import java.io.InputStream;
import java.net.URLEncoder;
import java.util.Date;
import java.util.HashMap;
@@ -57,11 +61,16 @@ import java.util.List;
import java.util.Map;
import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.nifi.processors.transfer.ResourceTransferProperties.FILE_RESOURCE_SERVICE;
+import static org.apache.nifi.processors.transfer.ResourceTransferProperties.RESOURCE_TRANSFER_SOURCE;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
public class TestPutS3Object {
@@ -87,6 +96,33 @@ public class TestPutS3Object {
runner.setEnvironmentVariableValue("java.io.tmpdir", System.getProperty("java.io.tmpdir"));
}
+ @Test
+ public void testPutSinglePartFromLocalFileSource() throws Exception {
+ prepareTest();
+
+ String serviceId = "fileresource";
+ FileResourceService service = mock(FileResourceService.class);
+ InputStream localFileInputStream = mock(InputStream.class);
+ when(service.getIdentifier()).thenReturn(serviceId);
+ long contentLength = 10L;
+ when(service.getFileResource(anyMap())).thenReturn(new FileResource(localFileInputStream, contentLength));
+
+ runner.addControllerService(serviceId, service);
+ runner.enableControllerService(service);
+ runner.setProperty(RESOURCE_TRANSFER_SOURCE, ResourceTransferSource.FILE_RESOURCE_SERVICE.getValue());
+ runner.setProperty(FILE_RESOURCE_SERVICE, serviceId);
+
+ runner.run();
+
+ ArgumentCaptor captureRequest = ArgumentCaptor.forClass(PutObjectRequest.class);
+ verify(mockS3Client).putObject(captureRequest.capture());
+ PutObjectRequest putObjectRequest = captureRequest.getValue();
+ assertEquals(localFileInputStream, putObjectRequest.getInputStream());
+ assertEquals(putObjectRequest.getMetadata().getContentLength(), contentLength);
+
+ runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 1);
+ }
+
@Test
public void testPutSinglePart() {
runner.setProperty("x-custom-prop", "hello");
@@ -95,7 +131,7 @@ public class TestPutS3Object {
runner.run(1);
ArgumentCaptor captureRequest = ArgumentCaptor.forClass(PutObjectRequest.class);
- Mockito.verify(mockS3Client, Mockito.times(1)).putObject(captureRequest.capture());
+ verify(mockS3Client, Mockito.times(1)).putObject(captureRequest.capture());
PutObjectRequest request = captureRequest.getValue();
assertEquals("test-bucket", request.getBucketName());
@@ -105,6 +141,7 @@ public class TestPutS3Object {
MockFlowFile ff0 = flowFiles.get(0);
ff0.assertAttributeEquals(CoreAttributes.FILENAME.key(), "testfile.txt");
+ ff0.assertContentEquals("Test Content");
ff0.assertAttributeEquals(PutS3Object.S3_ETAG_ATTR_KEY, "test-etag");
ff0.assertAttributeEquals(PutS3Object.S3_VERSION_ATTR_KEY, "test-version");
}
@@ -113,7 +150,7 @@ public class TestPutS3Object {
public void testPutSinglePartException() {
prepareTest();
- Mockito.when(mockS3Client.putObject(Mockito.any(PutObjectRequest.class))).thenThrow(new AmazonS3Exception("TestFail"));
+ when(mockS3Client.putObject(Mockito.any(PutObjectRequest.class))).thenThrow(new AmazonS3Exception("TestFail"));
runner.run(1);
@@ -150,7 +187,7 @@ public class TestPutS3Object {
runner.run(1);
ArgumentCaptor captureRequest = ArgumentCaptor.forClass(PutObjectRequest.class);
- Mockito.verify(mockS3Client, Mockito.times(1)).putObject(captureRequest.capture());
+ verify(mockS3Client, Mockito.times(1)).putObject(captureRequest.capture());
PutObjectRequest request = captureRequest.getValue();
List tagSet = request.getTagging().getTagSet();
@@ -169,7 +206,7 @@ public class TestPutS3Object {
runner.run(1);
ArgumentCaptor captureRequest = ArgumentCaptor.forClass(PutObjectRequest.class);
- Mockito.verify(mockS3Client, Mockito.times(1)).putObject(captureRequest.capture());
+ verify(mockS3Client, Mockito.times(1)).putObject(captureRequest.capture());
PutObjectRequest request = captureRequest.getValue();
assertEquals(storageClass.toString(), request.getStorageClass());
@@ -185,7 +222,7 @@ public class TestPutS3Object {
runner.run(1);
ArgumentCaptor captureRequest = ArgumentCaptor.forClass(PutObjectRequest.class);
- Mockito.verify(mockS3Client, Mockito.times(1)).putObject(captureRequest.capture());
+ verify(mockS3Client, Mockito.times(1)).putObject(captureRequest.capture());
PutObjectRequest request = captureRequest.getValue();
ObjectMetadata objectMetadata = request.getMetadata();
@@ -241,10 +278,10 @@ public class TestPutS3Object {
putObjectResult.setVersionId("test-version");
putObjectResult.setETag("test-etag");
- Mockito.when(mockS3Client.putObject(Mockito.any(PutObjectRequest.class))).thenReturn(putObjectResult);
+ when(mockS3Client.putObject(Mockito.any(PutObjectRequest.class))).thenReturn(putObjectResult);
MultipartUploadListing uploadListing = new MultipartUploadListing();
- Mockito.when(mockS3Client.listMultipartUploads(Mockito.any(ListMultipartUploadsRequest.class))).thenReturn(uploadListing);
+ when(mockS3Client.listMultipartUploads(Mockito.any(ListMultipartUploadsRequest.class))).thenReturn(uploadListing);
}
@Test