NIFI-6487 Add S3 User Metadata to ListS3 processor

Fix imports auto formatted by intellij

NIFI-6487 Fix WriteAttribute documentation

This closes #3603.

Signed-off-by: Koji Kawamura <ijokarumawak@apache.org>
This commit is contained in:
JF Beauvais 2019-07-25 14:20:35 +02:00 committed by Koji Kawamura
parent 1d22e8a86d
commit cdee1d8c09
No known key found for this signature in database
GPG Key ID: 36136B0EC89E4758
4 changed files with 117 additions and 18 deletions

View File

@ -26,9 +26,19 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
import com.amazonaws.services.s3.model.GetObjectTaggingRequest;
import com.amazonaws.services.s3.model.GetObjectTaggingResult;
import com.amazonaws.services.s3.model.ListObjectsRequest;
import com.amazonaws.services.s3.model.ListObjectsV2Request;
import com.amazonaws.services.s3.model.ListObjectsV2Result;
import com.amazonaws.services.s3.model.ListVersionsRequest;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.amazonaws.services.s3.model.S3VersionSummary;
import com.amazonaws.services.s3.model.Tag;
import com.amazonaws.services.s3.model.VersionListing;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
@ -53,14 +63,6 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.ListObjectsRequest;
import com.amazonaws.services.s3.model.ListVersionsRequest;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.amazonaws.services.s3.model.S3VersionSummary;
import com.amazonaws.services.s3.model.VersionListing;
import com.amazonaws.services.s3.model.ListObjectsV2Request;
import com.amazonaws.services.s3.model.ListObjectsV2Result;
@PrimaryNodeOnly
@TriggerSerially
@ -85,6 +87,8 @@ import com.amazonaws.services.s3.model.ListObjectsV2Result;
@WritesAttribute(attribute = "s3.storeClass", description = "The storage class of the object"),
@WritesAttribute(attribute = "s3.version", description = "The version of the object, if applicable"),
@WritesAttribute(attribute = "s3.tag.___", description = "If 'Write Object Tags' is set to 'True', the tags associated to the S3 object that is being listed " +
"will be written as part of the flowfile attributes"),
@WritesAttribute(attribute = "s3.user.metadata.___", description = "If 'Write User Metadata' is set to 'True', the user defined metadata associated to the S3 object that is being listed " +
"will be written as part of the flowfile attributes")})
@SeeAlso({FetchS3Object.class, PutS3Object.class, DeleteS3Object.class})
public class ListS3 extends AbstractS3Processor {
@ -150,8 +154,18 @@ public class ListS3 extends AbstractS3Processor {
.defaultValue("false")
.build();
public static final PropertyDescriptor WRITE_USER_METADATA = new PropertyDescriptor.Builder()
.name("write-s3-user-metadata")
.displayName("Write User Metadata")
.description("If set to 'True', the user defined metadata associated with the S3 object will be written as FlowFile attributes")
.required(true)
.allowableValues(new AllowableValue("true", "True"), new AllowableValue("false", "False"))
.defaultValue("false")
.build();
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
Arrays.asList(BUCKET, REGION, ACCESS_KEY, SECRET_KEY, WRITE_OBJECT_TAGS, CREDENTIALS_FILE,
Arrays.asList(BUCKET, REGION, ACCESS_KEY, SECRET_KEY, WRITE_OBJECT_TAGS, WRITE_USER_METADATA, CREDENTIALS_FILE,
AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE,
SIGNER_OVERRIDE, PROXY_CONFIGURATION_SERVICE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME,
PROXY_PASSWORD, DELIMITER, PREFIX, USE_VERSIONS, LIST_TYPE, MIN_AGE));
@ -286,6 +300,9 @@ public class ListS3 extends AbstractS3Processor {
if (context.getProperty(WRITE_OBJECT_TAGS).asBoolean()) {
attributes.putAll(writeObjectTags(client, versionSummary));
}
if (context.getProperty(WRITE_USER_METADATA).asBoolean()) {
attributes.putAll(writeUserMetadata(client, versionSummary));
}
// Create the flowfile
FlowFile flowFile = session.create();
@ -354,6 +371,17 @@ public class ListS3 extends AbstractS3Processor {
return tagMap;
}
private Map<String, String> writeUserMetadata(AmazonS3 client, S3VersionSummary versionSummary) {
ObjectMetadata objectMetadata = client.getObjectMetadata(new GetObjectMetadataRequest(versionSummary.getBucketName(), versionSummary.getKey()));
final Map<String, String> metadata = new HashMap<>();
if (objectMetadata != null) {
for (Map.Entry<String, String> e : objectMetadata.getUserMetadata().entrySet()) {
metadata.put("s3.user.metadata." + e.getKey(), e.getValue());
}
}
return metadata;
}
private interface S3BucketLister {
public void setBucketName(String bucketName);
public void setPrefix(String prefix);

View File

@ -42,6 +42,7 @@ import java.net.URISyntaxException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.Map;
import static org.junit.Assert.fail;
@ -148,6 +149,14 @@ public abstract class AbstractS3IT {
client.putObject(putRequest);
}
protected void putFileWithUserMetadata(String key, File file, Map<String, String> userMetadata) throws AmazonS3Exception, FileNotFoundException {
ObjectMetadata objectMetadata = new ObjectMetadata();
objectMetadata.setUserMetadata(userMetadata);
PutObjectRequest putRequest = new PutObjectRequest(BUCKET_NAME, key, new FileInputStream(file), objectMetadata);
client.putObject(putRequest);
}
protected void putFileWithObjectTag(String key, File file, List<Tag> objectTags) {
PutObjectRequest putRequest = new PutObjectRequest(BUCKET_NAME, key, file);
putRequest.setTagging(new ObjectTagging(objectTags));

View File

@ -24,9 +24,12 @@ import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Test;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
@ -173,4 +176,33 @@ public class ITListS3 extends AbstractS3IT {
flowFiles.assertAttributeEquals("s3.tag.dummytag2", "dummyvalue2");
}
@Test
public void testUserMetadataWritten() throws FileNotFoundException {
Map<String, String> userMetadata = new HashMap<>();
userMetadata.put("dummy.metadata.1", "dummyvalue1");
userMetadata.put("dummy.metadata.2", "dummyvalue2");
putFileWithUserMetadata("b/fileWithUserMetadata", getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME), userMetadata);
final TestRunner runner = TestRunners.newTestRunner(new ListS3());
runner.setProperty(ListS3.CREDENTIALS_FILE, CREDENTIALS_FILE);
runner.setProperty(ListS3.PREFIX, "b/");
runner.setProperty(ListS3.REGION, REGION);
runner.setProperty(ListS3.BUCKET, BUCKET_NAME);
runner.setProperty(ListS3.WRITE_USER_METADATA, "true");
runner.run();
runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 1);
MockFlowFile flowFiles = runner.getFlowFilesForRelationship(ListS3.REL_SUCCESS).get(0);
flowFiles.assertAttributeEquals("filename", "b/fileWithUserMetadata");
flowFiles.assertAttributeExists("s3.user.metadata.dummy.metadata.1");
flowFiles.assertAttributeExists("s3.user.metadata.dummy.metadata.2");
flowFiles.assertAttributeEquals("s3.user.metadata.dummy.metadata.1", "dummyvalue1");
flowFiles.assertAttributeEquals("s3.user.metadata.dummy.metadata.2", "dummyvalue2");
}
}

View File

@ -23,7 +23,16 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
import com.amazonaws.services.s3.model.GetObjectTaggingRequest;
import com.amazonaws.services.s3.model.ListObjectsRequest;
import com.amazonaws.services.s3.model.ListObjectsV2Request;
import com.amazonaws.services.s3.model.ListObjectsV2Result;
import com.amazonaws.services.s3.model.ListVersionsRequest;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.amazonaws.services.s3.model.S3VersionSummary;
import com.amazonaws.services.s3.model.VersionListing;
import org.apache.commons.lang3.time.DateUtils;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.Scope;
@ -34,14 +43,6 @@ import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.ListObjectsRequest;
import com.amazonaws.services.s3.model.ListVersionsRequest;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.amazonaws.services.s3.model.S3VersionSummary;
import com.amazonaws.services.s3.model.VersionListing;
import com.amazonaws.services.s3.model.ListObjectsV2Request;
import com.amazonaws.services.s3.model.ListObjectsV2Result;
import org.junit.Before;
import org.junit.Test;
@ -319,11 +320,39 @@ public class TestListS3 {
Mockito.verify(mockS3Client, Mockito.never()).listVersions(Mockito.any());
}
@Test
public void testWriteUserMetadata() {
runner.setProperty(ListS3.REGION, "eu-west-1");
runner.setProperty(ListS3.BUCKET, "test-bucket");
runner.setProperty(ListS3.WRITE_USER_METADATA, "true");
Date lastModified = new Date();
ObjectListing objectListing = new ObjectListing();
S3ObjectSummary objectSummary1 = new S3ObjectSummary();
objectSummary1.setBucketName("test-bucket");
objectSummary1.setKey("a");
objectSummary1.setLastModified(lastModified);
objectListing.getObjectSummaries().add(objectSummary1);
Mockito.when(mockS3Client.listObjects(Mockito.any(ListObjectsRequest.class))).thenReturn(objectListing);
runner.run();
ArgumentCaptor<GetObjectMetadataRequest> captureRequest = ArgumentCaptor.forClass(GetObjectMetadataRequest.class);
Mockito.verify(mockS3Client, Mockito.times(1)).getObjectMetadata(captureRequest.capture());
GetObjectMetadataRequest request = captureRequest.getValue();
assertEquals("test-bucket", request.getBucketName());
assertEquals("a", request.getKey());
Mockito.verify(mockS3Client, Mockito.never()).listVersions(Mockito.any());
}
@Test
public void testGetPropertyDescriptors() throws Exception {
ListS3 processor = new ListS3();
List<PropertyDescriptor> pd = processor.getSupportedPropertyDescriptors();
assertEquals("size should be eq", 21, pd.size());
assertEquals("size should be eq", 22, pd.size());
assertTrue(pd.contains(ListS3.ACCESS_KEY));
assertTrue(pd.contains(ListS3.AWS_CREDENTIALS_PROVIDER_SERVICE));
assertTrue(pd.contains(ListS3.BUCKET));
@ -331,6 +360,7 @@ public class TestListS3 {
assertTrue(pd.contains(ListS3.ENDPOINT_OVERRIDE));
assertTrue(pd.contains(ListS3.REGION));
assertTrue(pd.contains(ListS3.WRITE_OBJECT_TAGS));
assertTrue(pd.contains(ListS3.WRITE_USER_METADATA));
assertTrue(pd.contains(ListS3.SECRET_KEY));
assertTrue(pd.contains(ListS3.SIGNER_OVERRIDE));
assertTrue(pd.contains(ListS3.SSL_CONTEXT_SERVICE));