NIFI-5478: PutS3Object support for new Storage Classes

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #3608.
This commit is contained in:
Peter Turcsanyi 2019-07-26 15:27:32 +02:00 committed by Pierre Villard
parent b938fc20a3
commit a5bdecbd25
No known key found for this signature in database
GPG Key ID: BEE1599F0726E9CD
4 changed files with 70 additions and 28 deletions

View File

@ -155,7 +155,8 @@ public class PutS3Object extends AbstractS3Processor {
public static final PropertyDescriptor STORAGE_CLASS = new PropertyDescriptor.Builder() public static final PropertyDescriptor STORAGE_CLASS = new PropertyDescriptor.Builder()
.name("Storage Class") .name("Storage Class")
.required(true) .required(true)
.allowableValues(StorageClass.Standard.name(), StorageClass.ReducedRedundancy.name()) .allowableValues(StorageClass.Standard.name(), StorageClass.IntelligentTiering.name(), StorageClass.StandardInfrequentAccess.name(),
StorageClass.OneZoneInfrequentAccess.name(), StorageClass.Glacier.name(), StorageClass.DeepArchive.name(), StorageClass.ReducedRedundancy.name())
.defaultValue(StorageClass.Standard.name()) .defaultValue(StorageClass.Standard.name())
.build(); .build();
@ -512,9 +513,10 @@ public class PutS3Object extends AbstractS3Processor {
if (result.getExpirationTime() != null) { if (result.getExpirationTime() != null) {
attributes.put(S3_EXPIRATION_ATTR_KEY, result.getExpirationTime().toString()); attributes.put(S3_EXPIRATION_ATTR_KEY, result.getExpirationTime().toString());
} }
if (result.getMetadata().getRawMetadata().keySet().contains(S3_STORAGECLASS_META_KEY)) { if (result.getMetadata().getStorageClass() != null) {
attributes.put(S3_STORAGECLASS_ATTR_KEY, attributes.put(S3_STORAGECLASS_ATTR_KEY, result.getMetadata().getStorageClass());
result.getMetadata().getRawMetadataValue(S3_STORAGECLASS_META_KEY).toString()); } else {
attributes.put(S3_STORAGECLASS_ATTR_KEY, StorageClass.Standard.toString());
} }
if (userMetadata.size() > 0) { if (userMetadata.size() > 0) {
StringBuilder userMetaBldr = new StringBuilder(); StringBuilder userMetaBldr = new StringBuilder();

View File

@ -294,38 +294,59 @@ public class ITPutS3Object extends AbstractS3IT {
} }
@Test @Test
public void testStorageClass() throws IOException { public void testStorageClasses() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new PutS3Object()); final TestRunner runner = TestRunners.newTestRunner(new PutS3Object());
runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE); runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
runner.setProperty(PutS3Object.REGION, REGION); runner.setProperty(PutS3Object.REGION, REGION);
runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME); runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
runner.setProperty(PutS3Object.STORAGE_CLASS, StorageClass.ReducedRedundancy.name());
int bytesNeeded = 55 * 1024 * 1024;
StringBuilder bldr = new StringBuilder(bytesNeeded + 1000);
for (int line = 0; line < 55; line++) {
bldr.append(String.format("line %06d This is sixty-three characters plus the EOL marker!\n", line));
}
String data55mb = bldr.toString();
Assert.assertTrue(runner.setProperty("x-custom-prop", "hello").isValid()); Assert.assertTrue(runner.setProperty("x-custom-prop", "hello").isValid());
final Map<String, String> attrs = new HashMap<>(); for (StorageClass storageClass : StorageClass.values()) {
attrs.put("filename", "folder/2.txt"); runner.setProperty(PutS3Object.STORAGE_CLASS, storageClass.name());
runner.enqueue(getResourcePath(SAMPLE_FILE_RESOURCE_NAME), attrs);
attrs.put("filename", "folder/3.txt");
runner.enqueue(data55mb.getBytes(), attrs);
runner.run(2); final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "testStorageClasses/small_" + storageClass.name() + ".txt");
runner.enqueue(getResourcePath(SAMPLE_FILE_RESOURCE_NAME), attrs);
runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 2); runner.run();
FlowFile file1 = runner.getFlowFilesForRelationship(PutS3Object.REL_SUCCESS).get(0);
Assert.assertEquals(StorageClass.ReducedRedundancy.toString(), runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 1);
file1.getAttribute(PutS3Object.S3_STORAGECLASS_ATTR_KEY)); FlowFile file = runner.getFlowFilesForRelationship(PutS3Object.REL_SUCCESS).get(0);
FlowFile file2 = runner.getFlowFilesForRelationship(PutS3Object.REL_SUCCESS).get(1); Assert.assertEquals(storageClass.toString(), file.getAttribute(PutS3Object.S3_STORAGECLASS_ATTR_KEY));
Assert.assertEquals(StorageClass.ReducedRedundancy.toString(),
file2.getAttribute(PutS3Object.S3_STORAGECLASS_ATTR_KEY)); runner.clearTransferState();
}
}
@Test
public void testStorageClassesMultipart() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new PutS3Object());
runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
runner.setProperty(PutS3Object.REGION, REGION);
runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
runner.setProperty(PutS3Object.MULTIPART_THRESHOLD, "50 MB");
runner.setProperty(PutS3Object.MULTIPART_PART_SIZE, "50 MB");
Assert.assertTrue(runner.setProperty("x-custom-prop", "hello").isValid());
for (StorageClass storageClass : StorageClass.values()) {
runner.setProperty(PutS3Object.STORAGE_CLASS, storageClass.name());
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "testStorageClasses/large_" + storageClass.name() + ".dat");
runner.enqueue(new byte[50 * 1024 * 1024 + 1], attrs);
runner.run();
runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 1);
FlowFile file = runner.getFlowFilesForRelationship(PutS3Object.REL_SUCCESS).get(0);
Assert.assertEquals(storageClass.toString(), file.getAttribute(PutS3Object.S3_STORAGECLASS_ATTR_KEY));
runner.clearTransferState();
}
} }
@Test @Test
@ -415,7 +436,7 @@ public class ITPutS3Object extends AbstractS3IT {
client.setRegion(Region.fromValue(REGION).toAWSRegion()); client.setRegion(Region.fromValue(REGION).toAWSRegion());
String targetUri = client.getUrl(BUCKET_NAME, PROV1_FILE).toString(); String targetUri = client.getUrl(BUCKET_NAME, PROV1_FILE).toString();
Assert.assertEquals(targetUri, provRec1.getTransitUri()); Assert.assertEquals(targetUri, provRec1.getTransitUri());
Assert.assertEquals(7, provRec1.getUpdatedAttributes().size()); Assert.assertEquals(8, provRec1.getUpdatedAttributes().size());
Assert.assertEquals(BUCKET_NAME, provRec1.getUpdatedAttributes().get(PutS3Object.S3_BUCKET_KEY)); Assert.assertEquals(BUCKET_NAME, provRec1.getUpdatedAttributes().get(PutS3Object.S3_BUCKET_KEY));
} }

View File

@ -23,6 +23,7 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import com.amazonaws.services.s3.model.StorageClass;
import com.amazonaws.services.s3.model.Tag; import com.amazonaws.services.s3.model.Tag;
import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
@ -147,6 +148,24 @@ public class TestPutS3Object {
assertEquals("true", tagSet.get(0).getValue()); assertEquals("true", tagSet.get(0).getValue());
} }
@Test
public void testStorageClasses() {
for (StorageClass storageClass : StorageClass.values()) {
runner.setProperty(PutS3Object.STORAGE_CLASS, storageClass.name());
prepareTest();
runner.run(1);
ArgumentCaptor<PutObjectRequest> captureRequest = ArgumentCaptor.forClass(PutObjectRequest.class);
Mockito.verify(mockS3Client, Mockito.times(1)).putObject(captureRequest.capture());
PutObjectRequest request = captureRequest.getValue();
assertEquals(storageClass.toString(), request.getStorageClass());
Mockito.reset(mockS3Client);
}
}
@Test @Test
public void testFilenameWithNationalCharacters() throws UnsupportedEncodingException { public void testFilenameWithNationalCharacters() throws UnsupportedEncodingException {
prepareTest("Iñtërnâtiônàližætiøn.txt"); prepareTest("Iñtërnâtiônàližætiøn.txt");

View File

@ -26,7 +26,7 @@
<packaging>pom</packaging> <packaging>pom</packaging>
<properties> <properties>
<aws-java-sdk-version>1.11.461</aws-java-sdk-version> <aws-java-sdk-version>1.11.599</aws-java-sdk-version>
</properties> </properties>
<dependencyManagement> <dependencyManagement>