From a5bdecbd25a5ef46bd75bdb926d44cde8bc79c96 Mon Sep 17 00:00:00 2001 From: Peter Turcsanyi Date: Fri, 26 Jul 2019 15:27:32 +0200 Subject: [PATCH] NIFI-5478: PutS3Object support for new Storage Classes Signed-off-by: Pierre Villard This closes #3608. --- .../nifi/processors/aws/s3/PutS3Object.java | 10 +-- .../nifi/processors/aws/s3/ITPutS3Object.java | 67 ++++++++++++------- .../processors/aws/s3/TestPutS3Object.java | 19 ++++++ nifi-nar-bundles/nifi-aws-bundle/pom.xml | 2 +- 4 files changed, 70 insertions(+), 28 deletions(-) diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java index d331d15c7f..537d269be0 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java @@ -155,7 +155,8 @@ public class PutS3Object extends AbstractS3Processor { public static final PropertyDescriptor STORAGE_CLASS = new PropertyDescriptor.Builder() .name("Storage Class") .required(true) - .allowableValues(StorageClass.Standard.name(), StorageClass.ReducedRedundancy.name()) + .allowableValues(StorageClass.Standard.name(), StorageClass.IntelligentTiering.name(), StorageClass.StandardInfrequentAccess.name(), + StorageClass.OneZoneInfrequentAccess.name(), StorageClass.Glacier.name(), StorageClass.DeepArchive.name(), StorageClass.ReducedRedundancy.name()) .defaultValue(StorageClass.Standard.name()) .build(); @@ -512,9 +513,10 @@ public class PutS3Object extends AbstractS3Processor { if (result.getExpirationTime() != null) { attributes.put(S3_EXPIRATION_ATTR_KEY, result.getExpirationTime().toString()); } - if (result.getMetadata().getRawMetadata().keySet().contains(S3_STORAGECLASS_META_KEY)) { - attributes.put(S3_STORAGECLASS_ATTR_KEY, - result.getMetadata().getRawMetadataValue(S3_STORAGECLASS_META_KEY).toString()); + if (result.getMetadata().getStorageClass() != null) { + attributes.put(S3_STORAGECLASS_ATTR_KEY, result.getMetadata().getStorageClass()); + } else { + attributes.put(S3_STORAGECLASS_ATTR_KEY, StorageClass.Standard.toString()); } if (userMetadata.size() > 0) { StringBuilder userMetaBldr = new StringBuilder(); diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java index 5cb1a970e5..1bbfa28995 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java @@ -294,38 +294,59 @@ public class ITPutS3Object extends AbstractS3IT { } @Test - public void testStorageClass() throws IOException { + public void testStorageClasses() throws IOException { final TestRunner runner = TestRunners.newTestRunner(new PutS3Object()); runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE); runner.setProperty(PutS3Object.REGION, REGION); runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME); - runner.setProperty(PutS3Object.STORAGE_CLASS, StorageClass.ReducedRedundancy.name()); - - int bytesNeeded = 55 * 1024 * 1024; - StringBuilder bldr = new StringBuilder(bytesNeeded + 1000); - for (int line = 0; line < 55; line++) { - bldr.append(String.format("line %06d This is sixty-three characters plus the EOL marker!\n", line)); - } - String data55mb = bldr.toString(); Assert.assertTrue(runner.setProperty("x-custom-prop", "hello").isValid()); - final Map attrs = new HashMap<>(); - attrs.put("filename", "folder/2.txt"); - runner.enqueue(getResourcePath(SAMPLE_FILE_RESOURCE_NAME), attrs); - attrs.put("filename", "folder/3.txt"); - runner.enqueue(data55mb.getBytes(), attrs); + for (StorageClass storageClass : StorageClass.values()) { + runner.setProperty(PutS3Object.STORAGE_CLASS, storageClass.name()); - runner.run(2); + final Map attrs = new HashMap<>(); + attrs.put("filename", "testStorageClasses/small_" + storageClass.name() + ".txt"); + runner.enqueue(getResourcePath(SAMPLE_FILE_RESOURCE_NAME), attrs); - runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 2); - FlowFile file1 = runner.getFlowFilesForRelationship(PutS3Object.REL_SUCCESS).get(0); - Assert.assertEquals(StorageClass.ReducedRedundancy.toString(), - file1.getAttribute(PutS3Object.S3_STORAGECLASS_ATTR_KEY)); - FlowFile file2 = runner.getFlowFilesForRelationship(PutS3Object.REL_SUCCESS).get(1); - Assert.assertEquals(StorageClass.ReducedRedundancy.toString(), - file2.getAttribute(PutS3Object.S3_STORAGECLASS_ATTR_KEY)); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 1); + FlowFile file = runner.getFlowFilesForRelationship(PutS3Object.REL_SUCCESS).get(0); + Assert.assertEquals(storageClass.toString(), file.getAttribute(PutS3Object.S3_STORAGECLASS_ATTR_KEY)); + + runner.clearTransferState(); + } + } + + @Test + public void testStorageClassesMultipart() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new PutS3Object()); + + runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE); + runner.setProperty(PutS3Object.REGION, REGION); + runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME); + runner.setProperty(PutS3Object.MULTIPART_THRESHOLD, "50 MB"); + runner.setProperty(PutS3Object.MULTIPART_PART_SIZE, "50 MB"); + + Assert.assertTrue(runner.setProperty("x-custom-prop", "hello").isValid()); + + for (StorageClass storageClass : StorageClass.values()) { + runner.setProperty(PutS3Object.STORAGE_CLASS, storageClass.name()); + + final Map attrs = new HashMap<>(); + attrs.put("filename", "testStorageClasses/large_" + storageClass.name() + ".dat"); + runner.enqueue(new byte[50 * 1024 * 1024 + 1], attrs); + + runner.run(); + + runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 1); + FlowFile file = runner.getFlowFilesForRelationship(PutS3Object.REL_SUCCESS).get(0); + Assert.assertEquals(storageClass.toString(), file.getAttribute(PutS3Object.S3_STORAGECLASS_ATTR_KEY)); + + runner.clearTransferState(); + } } @Test @@ -415,7 +436,7 @@ public class ITPutS3Object extends AbstractS3IT { client.setRegion(Region.fromValue(REGION).toAWSRegion()); String targetUri = client.getUrl(BUCKET_NAME, PROV1_FILE).toString(); Assert.assertEquals(targetUri, provRec1.getTransitUri()); - Assert.assertEquals(7, provRec1.getUpdatedAttributes().size()); + Assert.assertEquals(8, provRec1.getUpdatedAttributes().size()); Assert.assertEquals(BUCKET_NAME, provRec1.getUpdatedAttributes().get(PutS3Object.S3_BUCKET_KEY)); } diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java index 67ebae790e..6bb750faf4 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java @@ -23,6 +23,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import com.amazonaws.services.s3.model.StorageClass; import com.amazonaws.services.s3.model.Tag; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; @@ -147,6 +148,24 @@ public class TestPutS3Object { assertEquals("true", tagSet.get(0).getValue()); } + @Test + public void testStorageClasses() { + for (StorageClass storageClass : StorageClass.values()) { + runner.setProperty(PutS3Object.STORAGE_CLASS, storageClass.name()); + prepareTest(); + + runner.run(1); + + ArgumentCaptor captureRequest = ArgumentCaptor.forClass(PutObjectRequest.class); + Mockito.verify(mockS3Client, Mockito.times(1)).putObject(captureRequest.capture()); + PutObjectRequest request = captureRequest.getValue(); + + assertEquals(storageClass.toString(), request.getStorageClass()); + + Mockito.reset(mockS3Client); + } + } + @Test public void testFilenameWithNationalCharacters() throws UnsupportedEncodingException { prepareTest("Iñtërnâtiônàližætiøn.txt"); diff --git a/nifi-nar-bundles/nifi-aws-bundle/pom.xml b/nifi-nar-bundles/nifi-aws-bundle/pom.xml index ef58e4564d..fceaa92618 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/pom.xml +++ b/nifi-nar-bundles/nifi-aws-bundle/pom.xml @@ -26,7 +26,7 @@ pom - 1.11.461 + 1.11.599