NIFI-25: Added AWS processors

This commit is contained in:
Mark Payne 2015-03-14 17:07:29 -04:00
parent 71989128b9
commit 373f470b6d
22 changed files with 1662 additions and 4 deletions

View File

@ -496,6 +496,21 @@ The following binary components are provided under the Apache Software License v
Apache License Version 2.0 http://www.apache.org/licenses/. Apache License Version 2.0 http://www.apache.org/licenses/.
(c) Daniel Lemire, http://lemire.me/en/ (c) Daniel Lemire, http://lemire.me/en/
(ASLv2) Amazon Web Services SDK
The following NOTICE information applies:
Copyright 2010-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
This product includes software developed by
Amazon Technologies, Inc (http://www.amazon.com/).
**********************
THIRD PARTY COMPONENTS
**********************
This software includes third party software subject to the following copyrights:
- XML parsing and utility functions from JetS3t - Copyright 2006-2009 James Murty.
- JSON parsing and utility functions from JSON.org - Copyright 2002 JSON.org.
- PKCS#1 PEM encoded private key parsing and utility functions from oauth.googlecode.com - Copyright 1998-2010 AOL Inc.
************************ ************************
Common Development and Distribution License 1.1 Common Development and Distribution License 1.1

View File

@ -0,0 +1,37 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-aws-bundle</artifactId>
<version>0.0.2-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-aws-nar</artifactId>
<version>0.0.2-incubating-SNAPSHOT</version>
<packaging>nar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-aws-processors</artifactId>
<version>0.0.2-incubating-SNAPSHOT</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,58 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-aws-bundle</artifactId>
<version>0.0.2-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-aws-processors</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,179 @@
package org.apache.nifi.processors.aws;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import com.amazonaws.AmazonWebServiceClient;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AnonymousAWSCredentials;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.auth.PropertiesCredentials;
import com.amazonaws.regions.Region;
import com.amazonaws.regions.Regions;
public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceClient> extends AbstractProcessor {
public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
.description("FlowFiles are routed to success after being successfully copied to Amazon S3").build();
public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
.description("FlowFiles are routed to failure if unable to be copied to Amazon S3").build();
public static final Set<Relationship> relationships = Collections.unmodifiableSet(
new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)));
public static final PropertyDescriptor CREDENTAILS_FILE = new PropertyDescriptor.Builder()
.name("Credentials File")
.expressionLanguageSupported(false)
.required(false)
.addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
.build();
public static final PropertyDescriptor ACCESS_KEY = new PropertyDescriptor.Builder()
.name("Access Key")
.expressionLanguageSupported(false)
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.sensitive(true)
.build();
public static final PropertyDescriptor SECRET_KEY = new PropertyDescriptor.Builder()
.name("Secret Key")
.expressionLanguageSupported(false)
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.sensitive(true)
.build();
public static final PropertyDescriptor REGION = new PropertyDescriptor.Builder()
.name("Region")
.required(true)
.allowableValues(getAvailableRegions())
.defaultValue(createAllowableValue(Regions.DEFAULT_REGION).getValue())
.build();
public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder()
.name("Communications Timeout")
.required(true)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.defaultValue("30 secs")
.build();
private volatile ClientType client;
private static AllowableValue createAllowableValue(final Regions regions) {
return new AllowableValue(regions.getName(), regions.getName(), regions.getName());
}
private static AllowableValue[] getAvailableRegions() {
final List<AllowableValue> values = new ArrayList<>();
for ( final Regions regions : Regions.values() ) {
values.add(createAllowableValue(regions));
}
return (AllowableValue[]) values.toArray(new AllowableValue[values.size()]);
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
final List<ValidationResult> problems = new ArrayList<>(super.customValidate(validationContext));
final boolean accessKeySet = validationContext.getProperty(ACCESS_KEY).isSet();
final boolean secretKeySet = validationContext.getProperty(SECRET_KEY).isSet();
if ( (accessKeySet && !secretKeySet) || (secretKeySet && !accessKeySet) ) {
problems.add(new ValidationResult.Builder().input("Access Key").valid(false).explanation("If setting Secret Key or Access Key, must set both").build());
}
final boolean credentialsFileSet = validationContext.getProperty(CREDENTAILS_FILE).isSet();
if ( (secretKeySet || accessKeySet) && credentialsFileSet ) {
problems.add(new ValidationResult.Builder().input("Access Key").valid(false).explanation("Cannot set both Credentials File and Secret Key/Access Key").build());
}
return problems;
}
protected ClientConfiguration createConfiguration(final ProcessContext context) {
final ClientConfiguration config = new ClientConfiguration();
config.setMaxConnections(context.getMaxConcurrentTasks());
config.setMaxErrorRetry(0);
config.setUserAgent("NiFi");
final int commsTimeout = context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
config.setConnectionTimeout(commsTimeout);
config.setSocketTimeout(commsTimeout);
return config;
}
@OnScheduled
public void onScheduled(final ProcessContext context) {
final ClientType awsClient = createClient(context, getCredentials(context), createConfiguration(context));
this.client = awsClient;
// if the processor supports REGION, get the configured region.
if ( getSupportedPropertyDescriptors().contains(REGION) ) {
final String region = context.getProperty(REGION).getValue();
if ( region != null ) {
client.setRegion(Region.getRegion(Regions.fromName(region)));
}
}
}
protected abstract ClientType createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config);
protected ClientType getClient() {
return client;
}
protected AWSCredentials getCredentials(final ProcessContext context) {
final String accessKey = context.getProperty(ACCESS_KEY).getValue();
final String secretKey = context.getProperty(SECRET_KEY).getValue();
final String credentialsFile = context.getProperty(CREDENTAILS_FILE).getValue();
if ( credentialsFile != null ) {
try {
return new PropertiesCredentials(new File(credentialsFile));
} catch (final IOException ioe) {
throw new ProcessException("Could not read Credentials File", ioe);
}
}
if ( accessKey != null && secretKey != null ) {
return new BasicAWSCredentials(accessKey, secretKey);
}
return new AnonymousAWSCredentials();
}
protected boolean isEmpty(final String value) {
return value == null || value.trim().equals("");
}
}

View File

@ -0,0 +1,155 @@
package org.apache.nifi.processors.aws.s3;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.aws.AbstractAWSProcessor;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.AccessControlList;
import com.amazonaws.services.s3.model.CanonicalGrantee;
import com.amazonaws.services.s3.model.EmailAddressGrantee;
import com.amazonaws.services.s3.model.Grantee;
import com.amazonaws.services.s3.model.Owner;
import com.amazonaws.services.s3.model.Permission;
public abstract class AbstractS3Processor extends AbstractAWSProcessor<AmazonS3Client> {
public static final PropertyDescriptor FULL_CONTROL_USER_LIST = new PropertyDescriptor.Builder()
.name("FullControl User List")
.required(false)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.description("A comma-separated list of Amazon User ID's or E-mail addresses that specifies who should have Full Control for an object")
.defaultValue("${s3.permissions.full.users}")
.build();
public static final PropertyDescriptor READ_USER_LIST = new PropertyDescriptor.Builder()
.name("Read Permission User List")
.required(false)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.description("A comma-separated list of Amazon User ID's or E-mail addresses that specifies who should have Read Access for an object")
.defaultValue("${s3.permissions.read.users}")
.build();
public static final PropertyDescriptor WRITE_USER_LIST = new PropertyDescriptor.Builder()
.name("Write Permission User List")
.required(false)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.description("A comma-separated list of Amazon User ID's or E-mail addresses that specifies who should have Write Access for an object")
.defaultValue("${s3.permissions.write.users}")
.build();
public static final PropertyDescriptor READ_ACL_LIST = new PropertyDescriptor.Builder()
.name("Read ACL User List")
.required(false)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.description("A comma-separated list of Amazon User ID's or E-mail addresses that specifies who should have permissions to read the Access Control List for an object")
.defaultValue("${s3.permissions.readacl.users}")
.build();
public static final PropertyDescriptor WRITE_ACL_LIST = new PropertyDescriptor.Builder()
.name("Write ACL User List")
.required(false)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.description("A comma-separated list of Amazon User ID's or E-mail addresses that specifies who should have permissions to change the Access Control List for an object")
.defaultValue("${s3.permissions.writeacl.users}")
.build();
public static final PropertyDescriptor OWNER = new PropertyDescriptor.Builder()
.name("Owner")
.required(false)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.description("The Amazon ID to use for the object's owner")
.defaultValue("${s3.owner}")
.build();
public static final PropertyDescriptor BUCKET = new PropertyDescriptor.Builder()
.name("Bucket")
.expressionLanguageSupported(true)
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor KEY = new PropertyDescriptor.Builder()
.name("Object Key")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.defaultValue("${filename}")
.build();
@Override
protected AmazonS3Client createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config) {
return new AmazonS3Client(credentials, config);
}
protected Grantee createGrantee(final String value) {
if ( isEmpty(value) ) {
return null;
}
if ( value.contains("@") ) {
return new EmailAddressGrantee(value);
} else {
return new CanonicalGrantee(value);
}
}
protected final List<Grantee> createGrantees(final String value) {
if ( isEmpty(value) ) {
return Collections.emptyList();
}
final List<Grantee> grantees = new ArrayList<>();
final String[] vals = value.split(",");
for ( final String val : vals ) {
final String identifier = val.trim();
final Grantee grantee = createGrantee(identifier);
if ( grantee != null ) {
grantees.add(grantee);
}
}
return grantees;
}
protected final AccessControlList createACL(final ProcessContext context, final FlowFile flowFile) {
final AccessControlList acl = new AccessControlList();
final String ownerId = context.getProperty(OWNER).evaluateAttributeExpressions(flowFile).getValue();
if ( !isEmpty(ownerId) ) {
final Owner owner = new Owner();
owner.setId(ownerId);
acl.setOwner(owner);
}
for ( final Grantee grantee : createGrantees(context.getProperty(FULL_CONTROL_USER_LIST).evaluateAttributeExpressions(flowFile).getValue())) {
acl.grantPermission(grantee, Permission.FullControl);
}
for ( final Grantee grantee : createGrantees(context.getProperty(READ_USER_LIST).evaluateAttributeExpressions(flowFile).getValue())) {
acl.grantPermission(grantee, Permission.Read);
}
for ( final Grantee grantee : createGrantees(context.getProperty(WRITE_USER_LIST).evaluateAttributeExpressions(flowFile).getValue())) {
acl.grantPermission(grantee, Permission.Write);
}
for ( final Grantee grantee : createGrantees(context.getProperty(READ_ACL_LIST).evaluateAttributeExpressions(flowFile).getValue())) {
acl.grantPermission(grantee, Permission.ReadAcp);
}
for ( final Grantee grantee : createGrantees(context.getProperty(WRITE_ACL_LIST).evaluateAttributeExpressions(flowFile).getValue())) {
acl.grantPermission(grantee, Permission.WriteAcp);
}
return acl;
}
}

View File

@ -0,0 +1,165 @@
package org.apache.nifi.processors.aws.s3;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.util.StandardValidators;
import com.amazonaws.AmazonClientException;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.S3Object;
@Tags({"Amazon", "S3", "AWS", "Get"})
@CapabilityDescription("Retrieves the contents of an S3 Object and writes it to the content of a FlowFile")
public class GetS3Object extends AbstractS3Processor {
public static final PropertyDescriptor VERSION_ID = new PropertyDescriptor.Builder()
.name("Version")
.description("The Version of the Object to download")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.required(false)
.build();
public static final PropertyDescriptor BYTE_RANGE_START = new PropertyDescriptor.Builder()
.name("First Byte Index")
.description("The 0-based index of the first byte to download. If specified, the first N bytes will be skipped, where N is the value of this property. If this value is greater than the size of the object, the FlowFile will be routed to failure.")
.required(false)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor BYTE_RANGE_END = new PropertyDescriptor.Builder()
.name("Last Byte Index")
.description("The 0-based index of the last byte to download. If specified, last N bytes will be skipped, where N is the size of the object minus the value of this property. If the value is greater than the size of the object, the content will be downloaded to the end of the object.")
.required(false)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
Arrays.asList(BUCKET, KEY, REGION, ACCESS_KEY, SECRET_KEY, CREDENTAILS_FILE, TIMEOUT, VERSION_ID,
BYTE_RANGE_START, BYTE_RANGE_END) );
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
FlowFile flowFile = session.get();
if ( flowFile == null ) {
return;
}
final long startNanos = System.nanoTime();
final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions(flowFile).getValue();
final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
final String versionId = context.getProperty(VERSION_ID).evaluateAttributeExpressions(flowFile).getValue();
final AmazonS3 client = getClient();
final GetObjectRequest request;
if ( versionId == null ) {
request = new GetObjectRequest(bucket, key);
} else {
request = new GetObjectRequest(bucket, key, versionId);
}
final Long byteRangeStart;
final Long byteRangeEnd;
try {
final PropertyValue startVal = context.getProperty(BYTE_RANGE_START).evaluateAttributeExpressions(flowFile);
byteRangeStart = startVal.isSet() ? startVal.asLong() : 0L;
final PropertyValue endVal = context.getProperty(BYTE_RANGE_END).evaluateAttributeExpressions(flowFile);
byteRangeEnd = endVal.isSet() ? endVal.asLong() : Long.MAX_VALUE;
} catch (final NumberFormatException nfe) {
getLogger().error("Failed to determine byte range for download for {} due to {}", new Object[] {flowFile, nfe});
session.transfer(flowFile, REL_FAILURE);
return;
}
if ( byteRangeStart != null && byteRangeEnd != null ) {
if ( byteRangeEnd.longValue() < byteRangeStart.longValue() ) {
getLogger().error("Failed to download object from S3 for {} because Start Byte Range is {} and End Byte Range is {}, which is less", new Object[] {flowFile, byteRangeStart, byteRangeEnd});
session.transfer(flowFile, REL_FAILURE);
return;
}
request.setRange(byteRangeStart.longValue(), byteRangeEnd.longValue());
}
final Map<String, String> attributes = new HashMap<>();
try (final S3Object s3Object = client.getObject(request)) {
flowFile = session.importFrom(s3Object.getObjectContent(), flowFile);
attributes.put("s3.bucket", s3Object.getBucketName());
final ObjectMetadata metadata = s3Object.getObjectMetadata();
if ( metadata.getContentDisposition() != null ) {
final String fullyQualified = metadata.getContentDisposition();
final int lastSlash = fullyQualified.lastIndexOf("/");
if ( lastSlash > -1 && lastSlash < fullyQualified.length() - 1 ) {
attributes.put(CoreAttributes.PATH.key(), fullyQualified.substring(0, lastSlash));
attributes.put(CoreAttributes.ABSOLUTE_PATH.key(), fullyQualified);
attributes.put(CoreAttributes.FILENAME.key(), fullyQualified.substring(lastSlash + 1));
} else {
attributes.put(CoreAttributes.FILENAME.key(), metadata.getContentDisposition());
}
}
if (metadata.getContentMD5() != null ) {
attributes.put("hash.value", metadata.getContentMD5());
attributes.put("hash.algorithm", "MD5");
}
if ( metadata.getContentType() != null ) {
attributes.put(CoreAttributes.MIME_TYPE.key(), metadata.getContentType());
}
if ( metadata.getETag() != null ) {
attributes.put("s3.etag", metadata.getETag());
}
if ( metadata.getExpirationTime() != null ) {
attributes.put("s3.expirationTime", String.valueOf(metadata.getExpirationTime().getTime()));
}
if ( metadata.getExpirationTimeRuleId() != null ) {
attributes.put("s3.expirationTimeRuleId", metadata.getExpirationTimeRuleId());
}
if ( metadata.getUserMetadata() != null ) {
attributes.putAll(metadata.getUserMetadata());
}
if ( metadata.getVersionId() != null ) {
attributes.put("s3.version", metadata.getVersionId());
}
} catch (final IOException | AmazonClientException ioe) {
getLogger().error("Failed to retrieve S3 Object for {}; routing to failure", new Object[] {flowFile, ioe});
session.transfer(flowFile, REL_FAILURE);
return;
}
if ( !attributes.isEmpty() ) {
flowFile = session.putAllAttributes(flowFile, attributes);
}
session.transfer(flowFile, REL_SUCCESS);
final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
getLogger().info("Successfully retrieved S3 Object for {} in {} millis; routing to success", new Object[] {flowFile, transferMillis});
session.getProvenanceReporter().receive(flowFile, "http://" + bucket + ".amazonaws.com/" + key, transferMillis);
}
}

View File

@ -0,0 +1,149 @@
package org.apache.nifi.processors.aws.s3;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import com.amazonaws.AmazonClientException;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.AccessControlList;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.PutObjectResult;
import com.amazonaws.services.s3.model.StorageClass;
@Tags({"Amazon", "S3", "AWS", "Archive", "Put"})
@CapabilityDescription("Puts FlowFiles to an Amazon S3 Bucket")
public class PutS3Object extends AbstractS3Processor {
public static final PropertyDescriptor EXPIRATION_RULE_ID = new PropertyDescriptor.Builder()
.name("Expiration Time Rule")
.required(false)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor STORAGE_CLASS = new PropertyDescriptor.Builder()
.name("Storage Class")
.required(true)
.allowableValues(StorageClass.Standard.name(), StorageClass.ReducedRedundancy.name())
.defaultValue(StorageClass.Standard.name())
.build();
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, CREDENTAILS_FILE, STORAGE_CLASS, REGION, TIMEOUT, EXPIRATION_RULE_ID,
FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER) );
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
.name(propertyDescriptorName)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.dynamic(true)
.build();
}
public void onTrigger(final ProcessContext context, final ProcessSession session) {
FlowFile flowFile = session.get();
if ( flowFile == null ) {
return;
}
final long startNanos = System.nanoTime();
final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions(flowFile).getValue();
final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
final AmazonS3 s3 = getClient();
final FlowFile ff = flowFile;
final Map<String, String> attributes = new HashMap<>();
try {
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(final InputStream rawIn) throws IOException {
try (final InputStream in = new BufferedInputStream(rawIn)) {
final ObjectMetadata objectMetadata = new ObjectMetadata();
objectMetadata.setContentDisposition(ff.getAttribute(CoreAttributes.FILENAME.key()));
objectMetadata.setContentLength(ff.getSize());
final String expirationRule = context.getProperty(EXPIRATION_RULE_ID).evaluateAttributeExpressions(ff).getValue();
if ( expirationRule != null ) {
objectMetadata.setExpirationTimeRuleId(expirationRule);
}
final Map<String, String> userMetadata = new HashMap<>();
for ( final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet() ) {
if ( entry.getKey().isDynamic() ) {
final String value = context.getProperty(entry.getKey()).evaluateAttributeExpressions(ff).getValue();
userMetadata.put(entry.getKey().getName(), value);
}
}
if ( !userMetadata.isEmpty() ) {
objectMetadata.setUserMetadata(userMetadata);
}
final PutObjectRequest request = new PutObjectRequest(bucket, key, in, objectMetadata);
request.setStorageClass(StorageClass.valueOf(context.getProperty(STORAGE_CLASS).getValue()));
final AccessControlList acl = createACL(context, ff);
if ( acl != null ) {
request.setAccessControlList(acl);
}
final PutObjectResult result = s3.putObject(request);
if ( result.getVersionId() != null ) {
attributes.put("s3.version", result.getVersionId());
}
attributes.put("s3.etag", result.getETag());
final Date expiration = result.getExpirationTime();
if ( expiration != null ) {
attributes.put("s3.expiration", expiration.toString());
}
}
}
});
if ( !attributes.isEmpty() ) {
flowFile = session.putAllAttributes(flowFile, attributes);
}
session.transfer(flowFile, REL_SUCCESS);
final String url = "http://" + bucket + ".s3.amazonaws.com/" + key;
final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
session.getProvenanceReporter().send(flowFile, url, millis);
getLogger().info("Successfully put {} to Amazon S3 in {} milliseconds", new Object[] {ff, millis});
} catch (final ProcessException | AmazonClientException pe) {
getLogger().error("Failed to put {} to Amazon S3 due to {}", new Object[] {flowFile, pe});
session.transfer(flowFile, REL_FAILURE);
}
}
}

View File

@ -0,0 +1,44 @@
package org.apache.nifi.processors.aws.sns;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.aws.AbstractAWSProcessor;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.services.sns.AmazonSNSClient;
public abstract class AbstractSNSProcessor extends AbstractAWSProcessor<AmazonSNSClient> {
protected static final AllowableValue ARN_TYPE_TOPIC =
new AllowableValue("Topic ARN", "Topic ARN", "The ARN is the name of a topic");
protected static final AllowableValue ARN_TYPE_TARGET =
new AllowableValue("Target ARN", "Target ARN", "The ARN is the name of a particular Target, used to notify a specific subscriber");
public static final PropertyDescriptor ARN = new PropertyDescriptor.Builder()
.name("Amazon Resource Name (ARN)")
.description("The name of the resource to which notifications should be published")
.expressionLanguageSupported(true)
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor ARN_TYPE = new PropertyDescriptor.Builder()
.name("ARN Type")
.description("The type of Amazon Resource Name that is being used.")
.expressionLanguageSupported(false)
.required(true)
.allowableValues(ARN_TYPE_TOPIC, ARN_TYPE_TARGET)
.defaultValue(ARN_TYPE_TOPIC.getValue())
.build();
@Override
protected AmazonSNSClient createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config) {
return new AmazonSNSClient(credentials, config);
}
}

View File

@ -0,0 +1,139 @@
package org.apache.nifi.processors.aws.sns;
import java.io.ByteArrayOutputStream;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.util.StandardValidators;
import com.amazonaws.services.sns.AmazonSNSClient;
import com.amazonaws.services.sns.model.MessageAttributeValue;
import com.amazonaws.services.sns.model.PublishRequest;
// TODO: Allow user to choose 'content strategy'
// 1. Content from message body or attributes? If attributes, allow new property for configuring EL expression. Otherwise, no property.
// 2. Same content to all subscribers or different content per subscription type?
// If same, just use single property.
// If different, must use Attribute values for each and have a separate property for each type of subscription (HTTP, HTTPS, E-mail, SMS, etc.)
@Tags({"amazon", "aws", "sns", "topic", "put", "publish", "pubsub"})
@CapabilityDescription("Sends the content of a FlowFile as a notification to the Amazon Simple Notification Service")
public class PutSNS extends AbstractSNSProcessor {
public static final PropertyDescriptor CHARACTER_ENCODING = new PropertyDescriptor.Builder()
.name("Character Set")
.description("The character set in which the FlowFile's content is encoded")
.defaultValue("UTF-8")
.expressionLanguageSupported(true)
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
.required(true)
.build();
public static final PropertyDescriptor USE_JSON_STRUCTURE = new PropertyDescriptor.Builder()
.name("Use JSON Structure")
.description("If true, the contents of the FlowFile must be JSON with a top-level element named 'default'. Additional elements can be used to send different messages to different protocols. See the Amazon SNS Documentation for more information.")
.defaultValue("false")
.allowableValues("true", "false")
.required(true)
.build();
public static final PropertyDescriptor SUBJECT = new PropertyDescriptor.Builder()
.name("E-mail Subject")
.description("The optional subject to use for any subscribers that are subscribed via E-mail")
.expressionLanguageSupported(true)
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
Arrays.asList(ARN, ARN_TYPE, SUBJECT, REGION, ACCESS_KEY, SECRET_KEY, CREDENTAILS_FILE, TIMEOUT,
USE_JSON_STRUCTURE, CHARACTER_ENCODING) );
public static final int MAX_SIZE = 256 * 1024;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
.name(propertyDescriptorName)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.required(false)
.dynamic(true)
.build();
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
FlowFile flowFile = session.get();
if ( flowFile == null ) {
return;
}
if ( flowFile.getSize() > MAX_SIZE ) {
getLogger().error("Cannot publish {} to SNS because its size exceeds Amazon SNS's limit of 256KB; routing to failure", new Object[] {flowFile});
session.transfer(flowFile, REL_FAILURE);
return;
}
final Charset charset = Charset.forName(context.getProperty(CHARACTER_ENCODING).evaluateAttributeExpressions(flowFile).getValue());
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
session.exportTo(flowFile, baos);
final String message = new String(baos.toByteArray(), charset);
final AmazonSNSClient client = getClient();
final PublishRequest request = new PublishRequest();
request.setMessage(message);
if ( context.getProperty(USE_JSON_STRUCTURE).asBoolean() ) {
request.setMessageStructure("json");
}
final String arn = context.getProperty(ARN).evaluateAttributeExpressions(flowFile).getValue();
final String arnType = context.getProperty(ARN_TYPE).getValue();
if ( arnType.equalsIgnoreCase(ARN_TYPE_TOPIC.getValue()) ) {
request.setTopicArn(arn);
} else {
request.setTargetArn(arn);
}
final String subject = context.getProperty(SUBJECT).evaluateAttributeExpressions(flowFile).getValue();
if ( subject != null ) {
request.setSubject(subject);
}
for ( final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet() ) {
if ( entry.getKey().isDynamic() && !isEmpty(entry.getValue()) ) {
final MessageAttributeValue value = new MessageAttributeValue();
value.setStringValue(context.getProperty(entry.getKey()).evaluateAttributeExpressions(flowFile).getValue());
value.setDataType("String");
request.addMessageAttributesEntry(entry.getKey().getName(), value);
}
}
try {
client.publish(request);
session.transfer(flowFile, REL_SUCCESS);
session.getProvenanceReporter().send(flowFile, arn);
getLogger().info("Successfully published notification for {}", new Object[] {flowFile});
} catch (final Exception e) {
getLogger().error("Failed to publish Amazon SNS message for {} due to {}", new Object[] {flowFile, e});
session.transfer(flowFile, REL_FAILURE);
return;
}
}
}

View File

@ -0,0 +1,35 @@
package org.apache.nifi.processors.aws.sqs;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.aws.AbstractAWSProcessor;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.services.sqs.AmazonSQSClient;
public abstract class AbstractSQSProcessor extends AbstractAWSProcessor<AmazonSQSClient> {
public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("Batch Size")
.description("The maximum number of messages to send in a single network request")
.required(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.defaultValue("25")
.build();
public static final PropertyDescriptor QUEUE_URL = new PropertyDescriptor.Builder()
.name("Queue URL")
.description("The URL of the queue to act upon")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.required(true)
.build();
@Override
protected AmazonSQSClient createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config) {
return new AmazonSQSClient(credentials, config);
}
}

View File

@ -0,0 +1,79 @@
package org.apache.nifi.processors.aws.sqs;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.util.StandardValidators;
import com.amazonaws.services.sqs.AmazonSQSClient;
import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest;
import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry;
@Tags({"Amazon", "SQS", "Queue", "Delete"})
@CapabilityDescription("Deletes a message from an Amazon Simple Queuing Service Queue")
public class DeleteSQS extends AbstractSQSProcessor {
public static final PropertyDescriptor RECEIPT_HANDLE = new PropertyDescriptor.Builder()
.name("Receipt Handle")
.description("The identifier that specifies the receipt of the message")
.expressionLanguageSupported(true)
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.defaultValue("${sqs.receipt.handle}")
.build();
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
Arrays.asList(ACCESS_KEY, SECRET_KEY, REGION, QUEUE_URL, TIMEOUT) );
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
// TODO: Process batch of FlowFiles. To do this, we have to first poll the first
// FlowFile and then use session.get(FlowFileFilter) to get up to BATCH_SIZE - 1
// more results that have the same Queue URL.
List<FlowFile> flowFiles = session.get(1);
if ( flowFiles.isEmpty() ) {
return;
}
final FlowFile firstFlowFile = flowFiles.get(0);
final String queueUrl = context.getProperty(QUEUE_URL).evaluateAttributeExpressions(firstFlowFile).getValue();
final AmazonSQSClient client = getClient();
final DeleteMessageBatchRequest request = new DeleteMessageBatchRequest();
request.setQueueUrl(queueUrl);
final List<DeleteMessageBatchRequestEntry> entries = new ArrayList<>(flowFiles.size());
for ( final FlowFile flowFile : flowFiles ) {
final DeleteMessageBatchRequestEntry entry = new DeleteMessageBatchRequestEntry();
entry.setReceiptHandle(context.getProperty(RECEIPT_HANDLE).evaluateAttributeExpressions(flowFile).getValue());
entries.add(entry);
}
request.setEntries(entries);
try {
client.deleteMessageBatch(request);
getLogger().info("Successfully deleted {} objects from SQS", new Object[] {flowFiles.size()});
session.transfer(flowFiles, REL_SUCCESS);
} catch (final Exception e) {
getLogger().error("Failed to delete {} objects from SQS due to {}", new Object[] {flowFiles.size(), e});
session.transfer(flowFiles, REL_FAILURE);
}
}
}

View File

@ -0,0 +1,177 @@
package org.apache.nifi.processors.aws.sqs;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import com.amazonaws.services.sqs.AmazonSQSClient;
import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest;
import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.MessageAttributeValue;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
@Tags({ "Amazon", "SQS", "Queue", "Get", "Fetch", "Poll"})
@CapabilityDescription("Fetches messages from an Amazon Simple Queuing Service Queue")
public class GetSQS extends AbstractSQSProcessor {
public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
.name("Character Set")
.description("The Character Set that should be used to encode the textual content of the SQS message")
.required(true)
.defaultValue("UTF-8")
.allowableValues(Charset.availableCharsets().keySet().toArray(new String[0]))
.build();
public static final PropertyDescriptor AUTO_DELETE = new PropertyDescriptor.Builder()
.name("Auto Delete Messages")
.description("Specifies whether the messages should be automatically deleted by the processors once they have been received.")
.required(true)
.allowableValues("true", "false")
.defaultValue("true")
.build();
public static final PropertyDescriptor VISIBILITY_TIMEOUT = new PropertyDescriptor.Builder()
.name("Visibility Timeout")
.description("The amount of time after a message is received but not deleted that the message is hidden from other consumers")
.expressionLanguageSupported(false)
.required(true)
.defaultValue("15 mins")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build();
public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("Batch Size")
.description("The maximum number of messages to send in a single network request")
.required(true)
.addValidator(StandardValidators.createLongValidator(1L, 10L, true))
.defaultValue("10")
.build();
public static final PropertyDescriptor STATIC_QUEUE_URL = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(QUEUE_URL)
.expressionLanguageSupported(false)
.build();
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
Arrays.asList(STATIC_QUEUE_URL, AUTO_DELETE, ACCESS_KEY, SECRET_KEY, CREDENTAILS_FILE, REGION, BATCH_SIZE, TIMEOUT, CHARSET, VISIBILITY_TIMEOUT) );
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
@Override
public Set<Relationship> getRelationships() {
return Collections.singleton(REL_SUCCESS);
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
final String queueUrl = context.getProperty(STATIC_QUEUE_URL).getValue();
final AmazonSQSClient client = getClient();
final ReceiveMessageRequest request = new ReceiveMessageRequest();
request.setAttributeNames(Collections.singleton("All"));
request.setMaxNumberOfMessages(context.getProperty(BATCH_SIZE).asInteger());
request.setVisibilityTimeout(context.getProperty(VISIBILITY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue());
request.setQueueUrl(queueUrl);
final Charset charset = Charset.forName(context.getProperty(CHARSET).getValue());
final ReceiveMessageResult result;
try {
result = client.receiveMessage(request);
} catch (final Exception e) {
getLogger().error("Failed to receive messages from Amazon SQS due to {}", new Object[] {e});
context.yield();
return;
}
final List<Message> messages = result.getMessages();
if ( messages.isEmpty() ) {
context.yield();
return;
}
final boolean autoDelete = context.getProperty(AUTO_DELETE).asBoolean();
for ( final Message message : messages ) {
FlowFile flowFile = session.create();
final Map<String, String> attributes = new HashMap<>();
for ( final Map.Entry<String, String> entry : message.getAttributes().entrySet() ) {
attributes.put("sqs." + entry.getKey(), entry.getValue());
}
for ( final Map.Entry<String, MessageAttributeValue> entry : message.getMessageAttributes().entrySet() ) {
attributes.put("sqs." + entry.getKey(), entry.getValue().getStringValue());
}
attributes.put("hash.value", message.getMD5OfBody());
attributes.put("hash.algorithm", "md5");
attributes.put("sqs.message.id", message.getMessageId());
attributes.put("sqs.receipt.handle", message.getReceiptHandle());
flowFile = session.putAllAttributes(flowFile, attributes);
flowFile = session.write(flowFile, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
out.write(message.getBody().getBytes(charset));
}
});
session.transfer(flowFile, REL_SUCCESS);
session.getProvenanceReporter().receive(flowFile, queueUrl);
getLogger().info("Successfully received {} from Amazon SQS", new Object[] {flowFile});
}
if ( autoDelete ) {
// If we want to auto-delete messages, we must fist commit the session to ensure that the data
// is persisted in NiFi's repositories.
session.commit();
final DeleteMessageBatchRequest deleteRequest = new DeleteMessageBatchRequest();
deleteRequest.setQueueUrl(queueUrl);
final List<DeleteMessageBatchRequestEntry> deleteRequestEntries = new ArrayList<>();
for ( final Message message : messages ) {
final DeleteMessageBatchRequestEntry entry = new DeleteMessageBatchRequestEntry();
entry.setId(message.getMessageId());
entry.setReceiptHandle(message.getReceiptHandle());
deleteRequestEntries.add(entry);
}
deleteRequest.setEntries(deleteRequestEntries);
try {
client.deleteMessageBatch(deleteRequest);
} catch (final Exception e) {
getLogger().error("Received {} messages from Amazon SQS but failed to delete the messages; these messages may be duplicated. Reason for deletion failure: {}", new Object[] {messages.size(), e});
}
}
}
}

View File

@ -0,0 +1,127 @@
package org.apache.nifi.processors.aws.sqs;
import java.io.ByteArrayOutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.util.StandardValidators;
import com.amazonaws.services.sqs.AmazonSQSClient;
import com.amazonaws.services.sqs.model.MessageAttributeValue;
import com.amazonaws.services.sqs.model.SendMessageBatchRequest;
import com.amazonaws.services.sqs.model.SendMessageBatchRequestEntry;
@Tags({"Amazon", "SQS", "Queue", "Put", "Publish"})
@CapabilityDescription("Publishes a message to an Amazon Simple Queuing Service Queue")
public class PutSQS extends AbstractSQSProcessor {
public static final PropertyDescriptor DELAY = new PropertyDescriptor.Builder()
.name("Delay")
.description("The amount of time to delay the message before it becomes available to consumers")
.required(true)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.defaultValue("0 secs")
.build();
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
Arrays.asList(QUEUE_URL, ACCESS_KEY, SECRET_KEY, CREDENTAILS_FILE, REGION, DELAY, TIMEOUT) );
private volatile List<PropertyDescriptor> userDefinedProperties = Collections.emptyList();
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
.name(propertyDescriptorName)
.expressionLanguageSupported(true)
.required(false)
.dynamic(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
}
@OnScheduled
public void setup(final ProcessContext context) {
userDefinedProperties = new ArrayList<>();
for ( final PropertyDescriptor descriptor : context.getProperties().keySet() ) {
if ( descriptor.isDynamic() ) {
userDefinedProperties.add(descriptor);
}
}
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
// TODO: Process batch of FlowFiles. To do this, we have to first poll the first
// FlowFile and then use session.get(FlowFileFilter) to get up to BATCH_SIZE - 1
// more results that have the same Queue URL.
FlowFile flowFile = session.get();
if ( flowFile == null ) {
return;
}
final long startNanos = System.nanoTime();
final AmazonSQSClient client = getClient();
final SendMessageBatchRequest request = new SendMessageBatchRequest();
final String queueUrl = context.getProperty(QUEUE_URL).evaluateAttributeExpressions(flowFile).getValue();
request.setQueueUrl(queueUrl);
final Set<SendMessageBatchRequestEntry> entries = new HashSet<>();
final SendMessageBatchRequestEntry entry = new SendMessageBatchRequestEntry();
entry.setId(flowFile.getAttribute("uuid"));
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
session.exportTo(flowFile, baos);
final String flowFileContent = baos.toString();
entry.setMessageBody(flowFileContent);
final Map<String, MessageAttributeValue> messageAttributes = new HashMap<>();
for ( final PropertyDescriptor descriptor : userDefinedProperties ) {
final MessageAttributeValue mav = new MessageAttributeValue();
mav.setDataType("String");
mav.setStringValue(context.getProperty(descriptor).evaluateAttributeExpressions(flowFile).getValue());
messageAttributes.put(descriptor.getName(), mav);
}
entry.setMessageAttributes(messageAttributes);
entry.setDelaySeconds(context.getProperty(DELAY).asTimePeriod(TimeUnit.SECONDS).intValue());
entries.add(entry);
request.setEntries(entries);
try {
client.sendMessageBatch(request);
} catch (final Exception e) {
getLogger().error("Failed to send messages to Amazon SQS due to {}; routing to failure", new Object[] {e});
session.transfer(flowFile, REL_FAILURE);
return;
}
getLogger().info("Successfully published message to Amazon SQS for {}", new Object[] {flowFile});
session.transfer(flowFile, REL_SUCCESS);
final long transmissionMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
session.getProvenanceReporter().send(flowFile, queueUrl, transmissionMillis);
}
}

View File

@ -0,0 +1,20 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.processors.aws.s3.GetS3Object
org.apache.nifi.processors.aws.s3.PutS3Object
org.apache.nifi.processors.aws.sns.PutSNS
org.apache.nifi.processors.aws.sqs.GetSQS
org.apache.nifi.processors.aws.sqs.PutSQS
org.apache.nifi.processors.aws.sqs.DeleteSQS

View File

@ -0,0 +1,47 @@
package org.apache.nifi.processors.aws.s3;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Ignore;
import org.junit.Test;
@Ignore("For local testing only - interacts with S3 so the credentials file must be configured and all necessary buckets created")
public class TestGetS3Object {
private final String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties";
@Test
public void testGet() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new GetS3Object());
runner.setProperty(GetS3Object.BUCKET, "anonymous-test-bucket-00000000");
runner.setProperty(GetS3Object.CREDENTAILS_FILE, CREDENTIALS_FILE);
runner.setProperty(GetS3Object.KEY, "folder/1.txt");
runner.setProperty(GetS3Object.BYTE_RANGE_START, "${start}");
runner.setProperty(GetS3Object.BYTE_RANGE_END, String.valueOf(Long.MAX_VALUE));
final Map<String, String> attrs = new HashMap<>();
attrs.put("start", "0");
runner.enqueue(new byte[0], attrs);
runner.run(1);
runner.assertAllFlowFilesTransferred(GetS3Object.REL_SUCCESS, 1);
final List<MockFlowFile> ffs = runner.getFlowFilesForRelationship(GetS3Object.REL_SUCCESS);
final MockFlowFile out = ffs.iterator().next();
final byte[] expectedBytes = Files.readAllBytes(Paths.get("src/test/resources/hello.txt"));
out.assertContentEquals(new String(expectedBytes));
for ( final Map.Entry<String, String> entry : out.getAttributes().entrySet() ) {
System.out.println(entry.getKey() + " : " + entry.getValue());
}
}
}

View File

@ -0,0 +1,87 @@
package org.apache.nifi.processors.aws.s3;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.Map;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import com.amazonaws.services.s3.model.StorageClass;
@Ignore("For local testing only - interacts with S3 so the credentials file must be configured and all necessary buckets created")
public class TestPutS3Object {
private final String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties";
@Test
public void testSimplePut() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new PutS3Object());
runner.setProperty(PutS3Object.CREDENTAILS_FILE, CREDENTIALS_FILE);
runner.setProperty(PutS3Object.BUCKET, "test-bucket-00000000-0000-0000-0000-123456789012");
runner.setProperty(PutS3Object.EXPIRATION_RULE_ID, "Expire Quickly");
Assert.assertTrue( runner.setProperty("x-custom-prop", "hello").isValid() );
for (int i=0; i < 3; i++) {
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", String.valueOf(i) + ".txt");
runner.enqueue(Paths.get("src/test/resources/hello.txt"), attrs);
}
runner.run(3);
runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 3);
}
@Test
public void testPutInFolder() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new PutS3Object());
runner.setProperty(PutS3Object.BUCKET, "test-bucket-00000000-0000-0000-0000-123456789012");
runner.setProperty(PutS3Object.CREDENTAILS_FILE, CREDENTIALS_FILE);
runner.setProperty(PutS3Object.EXPIRATION_RULE_ID, "Expire Quickly");
Assert.assertTrue( runner.setProperty("x-custom-prop", "hello").isValid() );
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "folder/1.txt");
runner.enqueue(Paths.get("src/test/resources/hello.txt"), attrs);
runner.run();
runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 1);
}
@Test
public void testStorageClass() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new PutS3Object());
runner.setProperty(PutS3Object.BUCKET, "test-bucket-00000000-0000-0000-0000-123456789012");
runner.setProperty(PutS3Object.CREDENTAILS_FILE, CREDENTIALS_FILE);
runner.setProperty(PutS3Object.STORAGE_CLASS, StorageClass.ReducedRedundancy.name());
Assert.assertTrue( runner.setProperty("x-custom-prop", "hello").isValid() );
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "folder/2.txt");
runner.enqueue(Paths.get("src/test/resources/hello.txt"), attrs);
runner.run();
runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 1);
}
@Test
public void testPermissions() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new PutS3Object());
runner.setProperty(PutS3Object.BUCKET, "test-bucket-00000000-0000-0000-0000-123456789012");
runner.setProperty(PutS3Object.CREDENTAILS_FILE, CREDENTIALS_FILE);
runner.setProperty(PutS3Object.FULL_CONTROL_USER_LIST, "28545acd76c35c7e91f8409b95fd1aa0c0914bfa1ac60975d9f48bc3c5e090b5");
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "folder/4.txt");
runner.enqueue(Paths.get("src/test/resources/hello.txt"), attrs);
runner.run();
runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 1);
}
}

View File

@ -0,0 +1,34 @@
package org.apache.nifi.processors.aws.sns;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.Map;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Ignore;
import org.junit.Test;
@Ignore("For local testing only - interacts with S3 so the credentials file must be configured and all necessary buckets created")
public class TestPutSNS {
private final String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties";
@Test
public void testPublish() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new PutSNS());
runner.setProperty(PutSNS.CREDENTAILS_FILE, CREDENTIALS_FILE);
runner.setProperty(PutSNS.ARN, "arn:aws:sns:us-west-2:100515378163:test-topic-1");
assertTrue( runner.setProperty("DynamicProperty", "hello!").isValid() );
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "1.txt");
runner.enqueue(Paths.get("src/test/resources/hello.txt"), attrs);
runner.run();
runner.assertAllFlowFilesTransferred(PutSNS.REL_SUCCESS, 1);
}
}

View File

@ -0,0 +1,32 @@
package org.apache.nifi.processors.aws.sqs;
import java.util.List;
import org.apache.nifi.processors.aws.sns.PutSNS;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Ignore;
import org.junit.Test;
@Ignore("For local testing only - interacts with S3 so the credentials file must be configured and all necessary buckets created")
public class TestGetSQS {
private final String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties";
@Test
public void testSimpleGet() {
final TestRunner runner = TestRunners.newTestRunner(new GetSQS());
runner.setProperty(PutSNS.CREDENTAILS_FILE, CREDENTIALS_FILE);
runner.setProperty(GetSQS.TIMEOUT, "30 secs");
runner.setProperty(GetSQS.QUEUE_URL, "https://sqs.us-west-2.amazonaws.com/100515378163/test-queue-000000000");
runner.run(1);
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetSQS.REL_SUCCESS);
for ( final MockFlowFile mff : flowFiles ) {
System.out.println(mff.getAttributes());
System.out.println(new String(mff.toByteArray()));
}
}
}

View File

@ -0,0 +1,35 @@
package org.apache.nifi.processors.aws.sqs;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.Map;
import org.apache.nifi.processors.aws.sns.PutSNS;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
@Ignore("For local testing only - interacts with S3 so the credentials file must be configured and all necessary buckets created")
public class TestPutSQS {
private final String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties";
@Test
public void testSimplePut() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new PutSQS());
runner.setProperty(PutSNS.CREDENTAILS_FILE, CREDENTIALS_FILE);
runner.setProperty(PutSQS.TIMEOUT, "30 secs");
runner.setProperty(PutSQS.QUEUE_URL, "https://sqs.us-west-2.amazonaws.com/100515378163/test-queue-000000000");
Assert.assertTrue( runner.setProperty("x-custom-prop", "hello").isValid() );
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "1.txt");
runner.enqueue(Paths.get("src/test/resources/hello.txt"), attrs);
runner.run(1);
runner.assertAllFlowFilesTransferred(PutSQS.REL_SUCCESS, 1);
}
}

View File

@ -0,0 +1,43 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-bundles</artifactId>
<version>0.0.2-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-aws-bundle</artifactId>
<packaging>pom</packaging>
<modules>
<module>nifi-aws-processors</module>
<module>nifi-aws-nar</module>
</modules>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk</artifactId>
<version>1.9.24</version>
</dependency>
</dependencies>
</dependencyManagement>
</project>

View File

@ -12,8 +12,7 @@
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
--> --><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<parent> <parent>
@ -35,6 +34,7 @@
<module>nifi-update-attribute-bundle</module> <module>nifi-update-attribute-bundle</module>
<module>nifi-kafka-bundle</module> <module>nifi-kafka-bundle</module>
<module>nifi-kite-bundle</module> <module>nifi-kite-bundle</module>
<module>nifi-aws-bundle</module>
</modules> </modules>
<dependencyManagement> <dependencyManagement>
<dependencies> <dependencies>