NIFI-537 fixed identified licensing issue with several of the new nars

This commit is contained in:
joewitt 2015-04-22 14:50:03 -04:00
parent 0d7838273c
commit 060a1e0d9c
23 changed files with 1256 additions and 958 deletions

View File

@ -512,28 +512,16 @@ The following binary components are provided under the Apache Software License v
JOAuth
Copyright 2010-2013 Twitter, Inc
Licensed under the Apache License, Version 2.0: http://www.apache.org/licenses/LICENSE-2.0
(ASLv2) Hosebird Client
The following NOTICE information applies:
Hosebird Client (hbc)
Copyright 2013 Twitter, Inc.
Licensed under the Apache License, Version 2.0: http://www.apache.org/licenses/LICENSE-2.0
(ASLv2) GeoIP2 Java API
The following NOTICE information applies:
GeoIP2 Java API
This software is Copyright (c) 2013 by MaxMind, Inc.
This is free software, licensed under the Apache License, Version 2.0.
(ASLv2) Google HTTP Client Library for Java
The following NOTICE information applies:
Google HTTP Client Library for Java
This is free software, licensed under the Apache License, Version 2.0.
(ASLv2) Amazon Web Services SDK
The following NOTICE information applies:
Copyright 2010-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
@ -549,7 +537,6 @@ The following binary components are provided under the Apache Software License v
- JSON parsing and utility functions from JSON.org - Copyright 2002 JSON.org.
- PKCS#1 PEM encoded private key parsing and utility functions from oauth.googlecode.com - Copyright 1998-2010 AOL Inc.
************************
Common Development and Distribution License 1.1
************************

View File

@ -1,14 +1,14 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
license agreements. See the NOTICE file distributed with this work for additional
information regarding copyright ownership. The ASF licenses this file to
You under the Apache License, Version 2.0 (the "License"); you may not use
this file except in compliance with the License. You may obtain a copy of
the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
by applicable law or agreed to in writing, software distributed under the
License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
OF ANY KIND, either express or implied. See the License for the specific
language governing permissions and limitations under the License. -->
license agreements. See the NOTICE file distributed with this work for additional
information regarding copyright ownership. The ASF licenses this file to
You under the Apache License, Version 2.0 (the "License"); you may not use
this file except in compliance with the License. You may obtain a copy of
the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
by applicable law or agreed to in writing, software distributed under the
License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
OF ANY KIND, either express or implied. See the License for the specific
language governing permissions and limitations under the License. -->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>

View File

@ -0,0 +1,74 @@
nifi-aws-nar
Copyright 2015 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
******************
Apache Software License v2
******************
The following binary components are provided under the Apache Software License v2
(ASLv2) Apache HttpComponents
The following NOTICE information applies:
Apache HttpClient
Copyright 1999-2014 The Apache Software Foundation
Apache HttpCore
Copyright 2005-2014 The Apache Software Foundation
This project contains annotations derived from JCIP-ANNOTATIONS
Copyright (c) 2005 Brian Goetz and Tim Peierls. See http://www.jcip.net
(ASLv2) Joda Time
The following NOTICE information applies:
This product includes software developed by
Joda.org (http://www.joda.org/).
(ASLv2) Apache Commons Codec
The following NOTICE information applies:
Apache Commons Codec
Copyright 2002-2014 The Apache Software Foundation
src/test/org/apache/commons/codec/language/DoubleMetaphoneTest.java
contains test data from http://aspell.net/test/orig/batch0.tab.
Copyright (C) 2002 Kevin Atkinson (kevina@gnu.org)
===============================================================================
The content of package org.apache.commons.codec.language.bm has been translated
from the original php source code available at http://stevemorse.org/phoneticinfo.htm
with permission from the original authors.
Original source copyright:
Copyright (c) 2008 Alexander Beider & Stephen P. Morse.
(ASLv2) Apache Commons Logging
The following NOTICE information applies:
Apache Commons Logging
Copyright 2003-2013 The Apache Software Foundation
(ASLv2) Apache Commons Lang
The following NOTICE information applies:
Apache Commons Lang
Copyright 2001-2014 The Apache Software Foundation
This product includes software from the Spring Framework,
under the Apache License 2.0 (see: StringUtils.containsWhitespace())
(ASLv2) Amazon Web Services SDK
The following NOTICE information applies:
Copyright 2010-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
This product includes software developed by
Amazon Technologies, Inc (http://www.amazon.com/).
**********************
THIRD PARTY COMPONENTS
**********************
This software includes third party software subject to the following copyrights:
- XML parsing and utility functions from JetS3t - Copyright 2006-2009 James Murty.
- JSON parsing and utility functions from JSON.org - Copyright 2002 JSON.org.
- PKCS#1 PEM encoded private key parsing and utility functions from oauth.googlecode.com - Copyright 1998-2010 AOL Inc.

View File

@ -55,4 +55,17 @@
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
<configuration>
<excludes>
<exclude>src/test/resources/hello.txt</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -91,17 +91,15 @@ public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceCl
.defaultValue("30 secs")
.build();
private volatile ClientType client;
private static AllowableValue createAllowableValue(final Regions regions) {
return new AllowableValue(regions.getName(), regions.getName(), regions.getName());
}
private static AllowableValue[] getAvailableRegions() {
final List<AllowableValue> values = new ArrayList<>();
for ( final Regions regions : Regions.values() ) {
for (final Regions regions : Regions.values()) {
values.add(createAllowableValue(regions));
}
@ -119,19 +117,18 @@ public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceCl
final boolean accessKeySet = validationContext.getProperty(ACCESS_KEY).isSet();
final boolean secretKeySet = validationContext.getProperty(SECRET_KEY).isSet();
if ( (accessKeySet && !secretKeySet) || (secretKeySet && !accessKeySet) ) {
if ((accessKeySet && !secretKeySet) || (secretKeySet && !accessKeySet)) {
problems.add(new ValidationResult.Builder().input("Access Key").valid(false).explanation("If setting Secret Key or Access Key, must set both").build());
}
final boolean credentialsFileSet = validationContext.getProperty(CREDENTAILS_FILE).isSet();
if ( (secretKeySet || accessKeySet) && credentialsFileSet ) {
if ((secretKeySet || accessKeySet) && credentialsFileSet) {
problems.add(new ValidationResult.Builder().input("Access Key").valid(false).explanation("Cannot set both Credentials File and Secret Key/Access Key").build());
}
return problems;
}
protected ClientConfiguration createConfiguration(final ProcessContext context) {
final ClientConfiguration config = new ClientConfiguration();
config.setMaxConnections(context.getMaxConcurrentTasks());
@ -145,16 +142,15 @@ public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceCl
return config;
}
@OnScheduled
public void onScheduled(final ProcessContext context) {
final ClientType awsClient = createClient(context, getCredentials(context), createConfiguration(context));
this.client = awsClient;
// if the processor supports REGION, get the configured region.
if ( getSupportedPropertyDescriptors().contains(REGION) ) {
if (getSupportedPropertyDescriptors().contains(REGION)) {
final String region = context.getProperty(REGION).getValue();
if ( region != null ) {
if (region != null) {
client.setRegion(Region.getRegion(Regions.fromName(region)));
}
}
@ -172,7 +168,7 @@ public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceCl
final String credentialsFile = context.getProperty(CREDENTAILS_FILE).getValue();
if ( credentialsFile != null ) {
if (credentialsFile != null) {
try {
return new PropertiesCredentials(new File(credentialsFile));
} catch (final IOException ioe) {
@ -180,14 +176,13 @@ public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceCl
}
}
if ( accessKey != null && secretKey != null ) {
if (accessKey != null && secretKey != null) {
return new BasicAWSCredentials(accessKey, secretKey);
}
return new AnonymousAWSCredentials();
}
protected boolean isEmpty(final String value) {
return value == null || value.trim().equals("");
}

View File

@ -100,19 +100,17 @@ public abstract class AbstractS3Processor extends AbstractAWSProcessor<AmazonS3C
.defaultValue("${filename}")
.build();
@Override
protected AmazonS3Client createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config) {
return new AmazonS3Client(credentials, config);
}
protected Grantee createGrantee(final String value) {
if ( isEmpty(value) ) {
if (isEmpty(value)) {
return null;
}
if ( value.contains("@") ) {
if (value.contains("@")) {
return new EmailAddressGrantee(value);
} else {
return new CanonicalGrantee(value);
@ -120,16 +118,16 @@ public abstract class AbstractS3Processor extends AbstractAWSProcessor<AmazonS3C
}
protected final List<Grantee> createGrantees(final String value) {
if ( isEmpty(value) ) {
if (isEmpty(value)) {
return Collections.emptyList();
}
final List<Grantee> grantees = new ArrayList<>();
final String[] vals = value.split(",");
for ( final String val : vals ) {
for (final String val : vals) {
final String identifier = val.trim();
final Grantee grantee = createGrantee(identifier);
if ( grantee != null ) {
if (grantee != null) {
grantees.add(grantee);
}
}
@ -140,29 +138,29 @@ public abstract class AbstractS3Processor extends AbstractAWSProcessor<AmazonS3C
final AccessControlList acl = new AccessControlList();
final String ownerId = context.getProperty(OWNER).evaluateAttributeExpressions(flowFile).getValue();
if ( !isEmpty(ownerId) ) {
if (!isEmpty(ownerId)) {
final Owner owner = new Owner();
owner.setId(ownerId);
acl.setOwner(owner);
}
for ( final Grantee grantee : createGrantees(context.getProperty(FULL_CONTROL_USER_LIST).evaluateAttributeExpressions(flowFile).getValue())) {
for (final Grantee grantee : createGrantees(context.getProperty(FULL_CONTROL_USER_LIST).evaluateAttributeExpressions(flowFile).getValue())) {
acl.grantPermission(grantee, Permission.FullControl);
}
for ( final Grantee grantee : createGrantees(context.getProperty(READ_USER_LIST).evaluateAttributeExpressions(flowFile).getValue())) {
for (final Grantee grantee : createGrantees(context.getProperty(READ_USER_LIST).evaluateAttributeExpressions(flowFile).getValue())) {
acl.grantPermission(grantee, Permission.Read);
}
for ( final Grantee grantee : createGrantees(context.getProperty(WRITE_USER_LIST).evaluateAttributeExpressions(flowFile).getValue())) {
for (final Grantee grantee : createGrantees(context.getProperty(WRITE_USER_LIST).evaluateAttributeExpressions(flowFile).getValue())) {
acl.grantPermission(grantee, Permission.Write);
}
for ( final Grantee grantee : createGrantees(context.getProperty(READ_ACL_LIST).evaluateAttributeExpressions(flowFile).getValue())) {
for (final Grantee grantee : createGrantees(context.getProperty(READ_ACL_LIST).evaluateAttributeExpressions(flowFile).getValue())) {
acl.grantPermission(grantee, Permission.ReadAcp);
}
for ( final Grantee grantee : createGrantees(context.getProperty(WRITE_ACL_LIST).evaluateAttributeExpressions(flowFile).getValue())) {
for (final Grantee grantee : createGrantees(context.getProperty(WRITE_ACL_LIST).evaluateAttributeExpressions(flowFile).getValue())) {
acl.grantPermission(grantee, Permission.WriteAcp);
}

View File

@ -43,24 +43,22 @@ import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.S3Object;
@SupportsBatching
@SeeAlso({PutS3Object.class})
@Tags({"Amazon", "S3", "AWS", "Get", "Fetch"})
@CapabilityDescription("Retrieves the contents of an S3 Object and writes it to the content of a FlowFile")
@WritesAttributes({
@WritesAttribute(attribute="s3.bucket", description="The name of the S3 bucket"),
@WritesAttribute(attribute="path", description="The path of the file"),
@WritesAttribute(attribute="absolute.path", description="The path of the file"),
@WritesAttribute(attribute="filename", description="The name of the file"),
@WritesAttribute(attribute="hash.value", description="The MD5 sum of the file"),
@WritesAttribute(attribute="hash.algorithm", description="MD5"),
@WritesAttribute(attribute="mime.type", description="If S3 provides the content type/MIME type, this attribute will hold that file"),
@WritesAttribute(attribute="s3.etag", description="The ETag that can be used to see if the file has changed"),
@WritesAttribute(attribute="s3.expirationTime", description="If the file has an expiration date, this attribute will be set, containing the milliseconds since epoch in UTC time"),
@WritesAttribute(attribute="s3.expirationTimeRuleId", description="The ID of the rule that dictates this object's expiration time"),
@WritesAttribute(attribute="s3.version", description="The version of the S3 object"),
})
@WritesAttribute(attribute = "s3.bucket", description = "The name of the S3 bucket"),
@WritesAttribute(attribute = "path", description = "The path of the file"),
@WritesAttribute(attribute = "absolute.path", description = "The path of the file"),
@WritesAttribute(attribute = "filename", description = "The name of the file"),
@WritesAttribute(attribute = "hash.value", description = "The MD5 sum of the file"),
@WritesAttribute(attribute = "hash.algorithm", description = "MD5"),
@WritesAttribute(attribute = "mime.type", description = "If S3 provides the content type/MIME type, this attribute will hold that file"),
@WritesAttribute(attribute = "s3.etag", description = "The ETag that can be used to see if the file has changed"),
@WritesAttribute(attribute = "s3.expirationTime", description = "If the file has an expiration date, this attribute will be set, containing the milliseconds since epoch in UTC time"),
@WritesAttribute(attribute = "s3.expirationTimeRuleId", description = "The ID of the rule that dictates this object's expiration time"),
@WritesAttribute(attribute = "s3.version", description = "The version of the S3 object"),})
public class FetchS3Object extends AbstractS3Processor {
public static final PropertyDescriptor VERSION_ID = new PropertyDescriptor.Builder()
@ -72,7 +70,7 @@ public class FetchS3Object extends AbstractS3Processor {
.build();
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
Arrays.asList(BUCKET, KEY, REGION, ACCESS_KEY, SECRET_KEY, CREDENTAILS_FILE, TIMEOUT, VERSION_ID) );
Arrays.asList(BUCKET, KEY, REGION, ACCESS_KEY, SECRET_KEY, CREDENTAILS_FILE, TIMEOUT, VERSION_ID));
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@ -82,7 +80,7 @@ public class FetchS3Object extends AbstractS3Processor {
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
FlowFile flowFile = session.get();
if ( flowFile == null ) {
if (flowFile == null) {
return;
}
@ -93,7 +91,7 @@ public class FetchS3Object extends AbstractS3Processor {
final AmazonS3 client = getClient();
final GetObjectRequest request;
if ( versionId == null ) {
if (versionId == null) {
request = new GetObjectRequest(bucket, key);
} else {
request = new GetObjectRequest(bucket, key, versionId);
@ -105,10 +103,10 @@ public class FetchS3Object extends AbstractS3Processor {
attributes.put("s3.bucket", s3Object.getBucketName());
final ObjectMetadata metadata = s3Object.getObjectMetadata();
if ( metadata.getContentDisposition() != null ) {
if (metadata.getContentDisposition() != null) {
final String fullyQualified = metadata.getContentDisposition();
final int lastSlash = fullyQualified.lastIndexOf("/");
if ( lastSlash > -1 && lastSlash < fullyQualified.length() - 1 ) {
if (lastSlash > -1 && lastSlash < fullyQualified.length() - 1) {
attributes.put(CoreAttributes.PATH.key(), fullyQualified.substring(0, lastSlash));
attributes.put(CoreAttributes.ABSOLUTE_PATH.key(), fullyQualified);
attributes.put(CoreAttributes.FILENAME.key(), fullyQualified.substring(lastSlash + 1));
@ -116,41 +114,41 @@ public class FetchS3Object extends AbstractS3Processor {
attributes.put(CoreAttributes.FILENAME.key(), metadata.getContentDisposition());
}
}
if (metadata.getContentMD5() != null ) {
if (metadata.getContentMD5() != null) {
attributes.put("hash.value", metadata.getContentMD5());
attributes.put("hash.algorithm", "MD5");
}
if ( metadata.getContentType() != null ) {
if (metadata.getContentType() != null) {
attributes.put(CoreAttributes.MIME_TYPE.key(), metadata.getContentType());
}
if ( metadata.getETag() != null ) {
if (metadata.getETag() != null) {
attributes.put("s3.etag", metadata.getETag());
}
if ( metadata.getExpirationTime() != null ) {
if (metadata.getExpirationTime() != null) {
attributes.put("s3.expirationTime", String.valueOf(metadata.getExpirationTime().getTime()));
}
if ( metadata.getExpirationTimeRuleId() != null ) {
if (metadata.getExpirationTimeRuleId() != null) {
attributes.put("s3.expirationTimeRuleId", metadata.getExpirationTimeRuleId());
}
if ( metadata.getUserMetadata() != null ) {
if (metadata.getUserMetadata() != null) {
attributes.putAll(metadata.getUserMetadata());
}
if ( metadata.getVersionId() != null ) {
if (metadata.getVersionId() != null) {
attributes.put("s3.version", metadata.getVersionId());
}
} catch (final IOException | AmazonClientException ioe) {
getLogger().error("Failed to retrieve S3 Object for {}; routing to failure", new Object[] {flowFile, ioe});
getLogger().error("Failed to retrieve S3 Object for {}; routing to failure", new Object[]{flowFile, ioe});
session.transfer(flowFile, REL_FAILURE);
return;
}
if ( !attributes.isEmpty() ) {
if (!attributes.isEmpty()) {
flowFile = session.putAllAttributes(flowFile, attributes);
}
session.transfer(flowFile, REL_SUCCESS);
final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
getLogger().info("Successfully retrieved S3 Object for {} in {} millis; routing to success", new Object[] {flowFile, transferMillis});
getLogger().info("Successfully retrieved S3 Object for {} in {} millis; routing to success", new Object[]{flowFile, transferMillis});
session.getProvenanceReporter().receive(flowFile, "http://" + bucket + ".amazonaws.com/" + key, transferMillis);
}

View File

@ -56,17 +56,18 @@ import com.amazonaws.services.s3.model.StorageClass;
@SeeAlso({FetchS3Object.class})
@Tags({"Amazon", "S3", "AWS", "Archive", "Put"})
@CapabilityDescription("Puts FlowFiles to an Amazon S3 Bucket")
@DynamicProperty(name="The name of a User-Defined Metadata field to add to the S3 Object",
value="The value of a User-Defined Metadata field to add to the S3 Object",
description="Allows user-defined metadata to be added to the S3 object as key/value pairs",
supportsExpressionLanguage=true)
@ReadsAttribute(attribute="filename", description="Uses the FlowFile's filename as the filename for the S3 object")
@DynamicProperty(name = "The name of a User-Defined Metadata field to add to the S3 Object",
value = "The value of a User-Defined Metadata field to add to the S3 Object",
description = "Allows user-defined metadata to be added to the S3 object as key/value pairs",
supportsExpressionLanguage = true)
@ReadsAttribute(attribute = "filename", description = "Uses the FlowFile's filename as the filename for the S3 object")
@WritesAttributes({
@WritesAttribute(attribute="s3.version", description="The version of the S3 Object that was put to S3"),
@WritesAttribute(attribute="s3.etag", description="The ETag of the S3 Object"),
@WritesAttribute(attribute="s3.expiration", description="A human-readable form of the expiration date of the S3 object, if one is set")
@WritesAttribute(attribute = "s3.version", description = "The version of the S3 Object that was put to S3"),
@WritesAttribute(attribute = "s3.etag", description = "The ETag of the S3 Object"),
@WritesAttribute(attribute = "s3.expiration", description = "A human-readable form of the expiration date of the S3 object, if one is set")
})
public class PutS3Object extends AbstractS3Processor {
public static final PropertyDescriptor EXPIRATION_RULE_ID = new PropertyDescriptor.Builder()
.name("Expiration Time Rule")
.required(false)
@ -83,8 +84,7 @@ public class PutS3Object extends AbstractS3Processor {
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, CREDENTAILS_FILE, STORAGE_CLASS, REGION, TIMEOUT, EXPIRATION_RULE_ID,
FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER) );
FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER));
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@ -103,7 +103,7 @@ public class PutS3Object extends AbstractS3Processor {
public void onTrigger(final ProcessContext context, final ProcessSession session) {
FlowFile flowFile = session.get();
if ( flowFile == null ) {
if (flowFile == null) {
return;
}
@ -125,45 +125,45 @@ public class PutS3Object extends AbstractS3Processor {
objectMetadata.setContentLength(ff.getSize());
final String expirationRule = context.getProperty(EXPIRATION_RULE_ID).evaluateAttributeExpressions(ff).getValue();
if ( expirationRule != null ) {
if (expirationRule != null) {
objectMetadata.setExpirationTimeRuleId(expirationRule);
}
final Map<String, String> userMetadata = new HashMap<>();
for ( final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet() ) {
if ( entry.getKey().isDynamic() ) {
for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
if (entry.getKey().isDynamic()) {
final String value = context.getProperty(entry.getKey()).evaluateAttributeExpressions(ff).getValue();
userMetadata.put(entry.getKey().getName(), value);
}
}
if ( !userMetadata.isEmpty() ) {
if (!userMetadata.isEmpty()) {
objectMetadata.setUserMetadata(userMetadata);
}
final PutObjectRequest request = new PutObjectRequest(bucket, key, in, objectMetadata);
request.setStorageClass(StorageClass.valueOf(context.getProperty(STORAGE_CLASS).getValue()));
final AccessControlList acl = createACL(context, ff);
if ( acl != null ) {
if (acl != null) {
request.setAccessControlList(acl);
}
final PutObjectResult result = s3.putObject(request);
if ( result.getVersionId() != null ) {
if (result.getVersionId() != null) {
attributes.put("s3.version", result.getVersionId());
}
attributes.put("s3.etag", result.getETag());
final Date expiration = result.getExpirationTime();
if ( expiration != null ) {
if (expiration != null) {
attributes.put("s3.expiration", expiration.toString());
}
}
}
});
if ( !attributes.isEmpty() ) {
if (!attributes.isEmpty()) {
flowFile = session.putAllAttributes(flowFile, attributes);
}
session.transfer(flowFile, REL_SUCCESS);
@ -172,9 +172,9 @@ public class PutS3Object extends AbstractS3Processor {
final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
session.getProvenanceReporter().send(flowFile, url, millis);
getLogger().info("Successfully put {} to Amazon S3 in {} milliseconds", new Object[] {ff, millis});
getLogger().info("Successfully put {} to Amazon S3 in {} milliseconds", new Object[]{ff, millis});
} catch (final ProcessException | AmazonClientException pe) {
getLogger().error("Failed to put {} to Amazon S3 due to {}", new Object[] {flowFile, pe});
getLogger().error("Failed to put {} to Amazon S3 due to {}", new Object[]{flowFile, pe});
session.transfer(flowFile, REL_FAILURE);
}
}

View File

@ -28,10 +28,10 @@ import com.amazonaws.services.sns.AmazonSNSClient;
public abstract class AbstractSNSProcessor extends AbstractAWSProcessor<AmazonSNSClient> {
protected static final AllowableValue ARN_TYPE_TOPIC =
new AllowableValue("Topic ARN", "Topic ARN", "The ARN is the name of a topic");
protected static final AllowableValue ARN_TYPE_TARGET =
new AllowableValue("Target ARN", "Target ARN", "The ARN is the name of a particular Target, used to notify a specific subscriber");
protected static final AllowableValue ARN_TYPE_TOPIC
= new AllowableValue("Topic ARN", "Topic ARN", "The ARN is the name of a topic");
protected static final AllowableValue ARN_TYPE_TARGET
= new AllowableValue("Target ARN", "Target ARN", "The ARN is the name of a particular Target, used to notify a specific subscriber");
public static final PropertyDescriptor ARN = new PropertyDescriptor.Builder()
.name("Amazon Resource Name (ARN)")
@ -50,8 +50,6 @@ public abstract class AbstractSNSProcessor extends AbstractAWSProcessor<AmazonSN
.defaultValue(ARN_TYPE_TOPIC.getValue())
.build();
@Override
protected AmazonSNSClient createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config) {
return new AmazonSNSClient(credentials, config);

View File

@ -70,7 +70,7 @@ public class PutSNS extends AbstractSNSProcessor {
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
Arrays.asList(ARN, ARN_TYPE, SUBJECT, REGION, ACCESS_KEY, SECRET_KEY, CREDENTAILS_FILE, TIMEOUT,
USE_JSON_STRUCTURE, CHARACTER_ENCODING) );
USE_JSON_STRUCTURE, CHARACTER_ENCODING));
public static final int MAX_SIZE = 256 * 1024;
@ -90,16 +90,15 @@ public class PutSNS extends AbstractSNSProcessor {
.build();
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
FlowFile flowFile = session.get();
if ( flowFile == null ) {
if (flowFile == null) {
return;
}
if ( flowFile.getSize() > MAX_SIZE ) {
getLogger().error("Cannot publish {} to SNS because its size exceeds Amazon SNS's limit of 256KB; routing to failure", new Object[] {flowFile});
if (flowFile.getSize() > MAX_SIZE) {
getLogger().error("Cannot publish {} to SNS because its size exceeds Amazon SNS's limit of 256KB; routing to failure", new Object[]{flowFile});
session.transfer(flowFile, REL_FAILURE);
return;
}
@ -114,25 +113,25 @@ public class PutSNS extends AbstractSNSProcessor {
final PublishRequest request = new PublishRequest();
request.setMessage(message);
if ( context.getProperty(USE_JSON_STRUCTURE).asBoolean() ) {
if (context.getProperty(USE_JSON_STRUCTURE).asBoolean()) {
request.setMessageStructure("json");
}
final String arn = context.getProperty(ARN).evaluateAttributeExpressions(flowFile).getValue();
final String arnType = context.getProperty(ARN_TYPE).getValue();
if ( arnType.equalsIgnoreCase(ARN_TYPE_TOPIC.getValue()) ) {
if (arnType.equalsIgnoreCase(ARN_TYPE_TOPIC.getValue())) {
request.setTopicArn(arn);
} else {
request.setTargetArn(arn);
}
final String subject = context.getProperty(SUBJECT).evaluateAttributeExpressions(flowFile).getValue();
if ( subject != null ) {
if (subject != null) {
request.setSubject(subject);
}
for ( final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet() ) {
if ( entry.getKey().isDynamic() && !isEmpty(entry.getValue()) ) {
for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
if (entry.getKey().isDynamic() && !isEmpty(entry.getValue())) {
final MessageAttributeValue value = new MessageAttributeValue();
value.setStringValue(context.getProperty(entry.getKey()).evaluateAttributeExpressions(flowFile).getValue());
value.setDataType("String");
@ -144,9 +143,9 @@ public class PutSNS extends AbstractSNSProcessor {
client.publish(request);
session.transfer(flowFile, REL_SUCCESS);
session.getProvenanceReporter().send(flowFile, arn);
getLogger().info("Successfully published notification for {}", new Object[] {flowFile});
getLogger().info("Successfully published notification for {}", new Object[]{flowFile});
} catch (final Exception e) {
getLogger().error("Failed to publish Amazon SNS message for {} due to {}", new Object[] {flowFile, e});
getLogger().error("Failed to publish Amazon SNS message for {} due to {}", new Object[]{flowFile, e});
session.transfer(flowFile, REL_FAILURE);
return;
}

View File

@ -40,6 +40,7 @@ import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry;
@Tags({"Amazon", "AWS", "SQS", "Queue", "Delete"})
@CapabilityDescription("Deletes a message from an Amazon Simple Queuing Service Queue")
public class DeleteSQS extends AbstractSQSProcessor {
public static final PropertyDescriptor RECEIPT_HANDLE = new PropertyDescriptor.Builder()
.name("Receipt Handle")
.description("The identifier that specifies the receipt of the message")
@ -50,18 +51,17 @@ public class DeleteSQS extends AbstractSQSProcessor {
.build();
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
Arrays.asList(ACCESS_KEY, SECRET_KEY, REGION, QUEUE_URL, TIMEOUT) );
Arrays.asList(ACCESS_KEY, SECRET_KEY, REGION, QUEUE_URL, TIMEOUT));
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
List<FlowFile> flowFiles = session.get(1);
if ( flowFiles.isEmpty() ) {
if (flowFiles.isEmpty()) {
return;
}
@ -74,7 +74,7 @@ public class DeleteSQS extends AbstractSQSProcessor {
final List<DeleteMessageBatchRequestEntry> entries = new ArrayList<>(flowFiles.size());
for ( final FlowFile flowFile : flowFiles ) {
for (final FlowFile flowFile : flowFiles) {
final DeleteMessageBatchRequestEntry entry = new DeleteMessageBatchRequestEntry();
entry.setReceiptHandle(context.getProperty(RECEIPT_HANDLE).evaluateAttributeExpressions(flowFile).getValue());
entries.add(entry);
@ -84,10 +84,10 @@ public class DeleteSQS extends AbstractSQSProcessor {
try {
client.deleteMessageBatch(request);
getLogger().info("Successfully deleted {} objects from SQS", new Object[] {flowFiles.size()});
getLogger().info("Successfully deleted {} objects from SQS", new Object[]{flowFiles.size()});
session.transfer(flowFiles, REL_SUCCESS);
} catch (final Exception e) {
getLogger().error("Failed to delete {} objects from SQS due to {}", new Object[] {flowFiles.size(), e});
getLogger().error("Failed to delete {} objects from SQS due to {}", new Object[]{flowFiles.size(), e});
session.transfer(flowFiles, REL_FAILURE);
}
}

View File

@ -51,16 +51,17 @@ import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
@SupportsBatching
@Tags({ "Amazon", "AWS", "SQS", "Queue", "Get", "Fetch", "Poll"})
@Tags({"Amazon", "AWS", "SQS", "Queue", "Get", "Fetch", "Poll"})
@SeeAlso({PutSQS.class, DeleteSQS.class})
@CapabilityDescription("Fetches messages from an Amazon Simple Queuing Service Queue")
@WritesAttributes({
@WritesAttribute(attribute="hash.value", description="The MD5 sum of the message"),
@WritesAttribute(attribute="hash.algorithm", description="MD5"),
@WritesAttribute(attribute="sqs.message.id", description="The unique identifier of the SQS message"),
@WritesAttribute(attribute="sqs.receipt.handle", description="The SQS Receipt Handle that is to be used to delete the message from the queue")
@WritesAttribute(attribute = "hash.value", description = "The MD5 sum of the message"),
@WritesAttribute(attribute = "hash.algorithm", description = "MD5"),
@WritesAttribute(attribute = "sqs.message.id", description = "The unique identifier of the SQS message"),
@WritesAttribute(attribute = "sqs.receipt.handle", description = "The SQS Receipt Handle that is to be used to delete the message from the queue")
})
public class GetSQS extends AbstractSQSProcessor {
public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
.name("Character Set")
.description("The Character Set that should be used to encode the textual content of the SQS message")
@ -94,14 +95,13 @@ public class GetSQS extends AbstractSQSProcessor {
.defaultValue("10")
.build();
public static final PropertyDescriptor STATIC_QUEUE_URL = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(QUEUE_URL)
.expressionLanguageSupported(false)
.build();
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
Arrays.asList(STATIC_QUEUE_URL, AUTO_DELETE, ACCESS_KEY, SECRET_KEY, CREDENTAILS_FILE, REGION, BATCH_SIZE, TIMEOUT, CHARSET, VISIBILITY_TIMEOUT) );
Arrays.asList(STATIC_QUEUE_URL, AUTO_DELETE, ACCESS_KEY, SECRET_KEY, CREDENTAILS_FILE, REGION, BATCH_SIZE, TIMEOUT, CHARSET, VISIBILITY_TIMEOUT));
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@ -131,28 +131,28 @@ public class GetSQS extends AbstractSQSProcessor {
try {
result = client.receiveMessage(request);
} catch (final Exception e) {
getLogger().error("Failed to receive messages from Amazon SQS due to {}", new Object[] {e});
getLogger().error("Failed to receive messages from Amazon SQS due to {}", new Object[]{e});
context.yield();
return;
}
final List<Message> messages = result.getMessages();
if ( messages.isEmpty() ) {
if (messages.isEmpty()) {
context.yield();
return;
}
final boolean autoDelete = context.getProperty(AUTO_DELETE).asBoolean();
for ( final Message message : messages ) {
for (final Message message : messages) {
FlowFile flowFile = session.create();
final Map<String, String> attributes = new HashMap<>();
for ( final Map.Entry<String, String> entry : message.getAttributes().entrySet() ) {
for (final Map.Entry<String, String> entry : message.getAttributes().entrySet()) {
attributes.put("sqs." + entry.getKey(), entry.getValue());
}
for ( final Map.Entry<String, MessageAttributeValue> entry : message.getMessageAttributes().entrySet() ) {
for (final Map.Entry<String, MessageAttributeValue> entry : message.getMessageAttributes().entrySet()) {
attributes.put("sqs." + entry.getKey(), entry.getValue().getStringValue());
}
@ -172,10 +172,10 @@ public class GetSQS extends AbstractSQSProcessor {
session.transfer(flowFile, REL_SUCCESS);
session.getProvenanceReporter().receive(flowFile, queueUrl);
getLogger().info("Successfully received {} from Amazon SQS", new Object[] {flowFile});
getLogger().info("Successfully received {} from Amazon SQS", new Object[]{flowFile});
}
if ( autoDelete ) {
if (autoDelete) {
// If we want to auto-delete messages, we must fist commit the session to ensure that the data
// is persisted in NiFi's repositories.
session.commit();
@ -183,7 +183,7 @@ public class GetSQS extends AbstractSQSProcessor {
final DeleteMessageBatchRequest deleteRequest = new DeleteMessageBatchRequest();
deleteRequest.setQueueUrl(queueUrl);
final List<DeleteMessageBatchRequestEntry> deleteRequestEntries = new ArrayList<>();
for ( final Message message : messages ) {
for (final Message message : messages) {
final DeleteMessageBatchRequestEntry entry = new DeleteMessageBatchRequestEntry();
entry.setId(message.getMessageId());
entry.setReceiptHandle(message.getReceiptHandle());
@ -195,7 +195,7 @@ public class GetSQS extends AbstractSQSProcessor {
try {
client.deleteMessageBatch(deleteRequest);
} catch (final Exception e) {
getLogger().error("Received {} messages from Amazon SQS but failed to delete the messages; these messages may be duplicated. Reason for deletion failure: {}", new Object[] {messages.size(), e});
getLogger().error("Received {} messages from Amazon SQS but failed to delete the messages; these messages may be duplicated. Reason for deletion failure: {}", new Object[]{messages.size(), e});
}
}

View File

@ -44,14 +44,13 @@ import com.amazonaws.services.sqs.model.MessageAttributeValue;
import com.amazonaws.services.sqs.model.SendMessageBatchRequest;
import com.amazonaws.services.sqs.model.SendMessageBatchRequestEntry;
@SupportsBatching
@Tags({"Amazon", "AWS", "SQS", "Queue", "Put", "Publish"})
@SeeAlso({GetSQS.class, DeleteSQS.class})
@CapabilityDescription("Publishes a message to an Amazon Simple Queuing Service Queue")
@DynamicProperty(name="The name of a Message Attribute to add to the message", value="The value of the Message Attribute",
description="Allows the user to add key/value pairs as Message Attributes by adding a property whose name will become the name of "
+ "the Message Attribute and value will become the value of the Message Attribute", supportsExpressionLanguage=true)
@DynamicProperty(name = "The name of a Message Attribute to add to the message", value = "The value of the Message Attribute",
description = "Allows the user to add key/value pairs as Message Attributes by adding a property whose name will become the name of "
+ "the Message Attribute and value will become the value of the Message Attribute", supportsExpressionLanguage = true)
public class PutSQS extends AbstractSQSProcessor {
public static final PropertyDescriptor DELAY = new PropertyDescriptor.Builder()
@ -63,7 +62,7 @@ public class PutSQS extends AbstractSQSProcessor {
.build();
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
Arrays.asList(QUEUE_URL, ACCESS_KEY, SECRET_KEY, CREDENTAILS_FILE, REGION, DELAY, TIMEOUT) );
Arrays.asList(QUEUE_URL, ACCESS_KEY, SECRET_KEY, CREDENTAILS_FILE, REGION, DELAY, TIMEOUT));
private volatile List<PropertyDescriptor> userDefinedProperties = Collections.emptyList();
@ -86,8 +85,8 @@ public class PutSQS extends AbstractSQSProcessor {
@OnScheduled
public void setup(final ProcessContext context) {
userDefinedProperties = new ArrayList<>();
for ( final PropertyDescriptor descriptor : context.getProperties().keySet() ) {
if ( descriptor.isDynamic() ) {
for (final PropertyDescriptor descriptor : context.getProperties().keySet()) {
if (descriptor.isDynamic()) {
userDefinedProperties.add(descriptor);
}
}
@ -96,7 +95,7 @@ public class PutSQS extends AbstractSQSProcessor {
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
FlowFile flowFile = session.get();
if ( flowFile == null ) {
if (flowFile == null) {
return;
}
@ -117,7 +116,7 @@ public class PutSQS extends AbstractSQSProcessor {
final Map<String, MessageAttributeValue> messageAttributes = new HashMap<>();
for ( final PropertyDescriptor descriptor : userDefinedProperties ) {
for (final PropertyDescriptor descriptor : userDefinedProperties) {
final MessageAttributeValue mav = new MessageAttributeValue();
mav.setDataType("String");
mav.setStringValue(context.getProperty(descriptor).evaluateAttributeExpressions(flowFile).getValue());
@ -133,12 +132,12 @@ public class PutSQS extends AbstractSQSProcessor {
try {
client.sendMessageBatch(request);
} catch (final Exception e) {
getLogger().error("Failed to send messages to Amazon SQS due to {}; routing to failure", new Object[] {e});
getLogger().error("Failed to send messages to Amazon SQS due to {}; routing to failure", new Object[]{e});
session.transfer(flowFile, REL_FAILURE);
return;
}
getLogger().info("Successfully published message to Amazon SQS for {}", new Object[] {flowFile});
getLogger().info("Successfully published message to Amazon SQS for {}", new Object[]{flowFile});
session.transfer(flowFile, REL_SUCCESS);
final long transmissionMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
session.getProvenanceReporter().send(flowFile, queueUrl, transmissionMillis);

View File

@ -1,3 +1,19 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.aws.s3;
import java.io.IOException;
@ -15,6 +31,7 @@ import org.junit.Test;
@Ignore("For local testing only - interacts with S3 so the credentials file must be configured and all necessary buckets created")
public class TestFetchS3Object {
private final String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties";
@Test
@ -36,7 +53,7 @@ public class TestFetchS3Object {
final byte[] expectedBytes = Files.readAllBytes(Paths.get("src/test/resources/hello.txt"));
out.assertContentEquals(new String(expectedBytes));
for ( final Map.Entry<String, String> entry : out.getAttributes().entrySet() ) {
for (final Map.Entry<String, String> entry : out.getAttributes().entrySet()) {
System.out.println(entry.getKey() + " : " + entry.getValue());
}
}

View File

@ -1,3 +1,19 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.aws.s3;
import java.io.IOException;
@ -24,9 +40,9 @@ public class TestPutS3Object {
runner.setProperty(PutS3Object.CREDENTAILS_FILE, CREDENTIALS_FILE);
runner.setProperty(PutS3Object.BUCKET, "test-bucket-00000000-0000-0000-0000-123456789012");
runner.setProperty(PutS3Object.EXPIRATION_RULE_ID, "Expire Quickly");
Assert.assertTrue( runner.setProperty("x-custom-prop", "hello").isValid() );
Assert.assertTrue(runner.setProperty("x-custom-prop", "hello").isValid());
for (int i=0; i < 3; i++) {
for (int i = 0; i < 3; i++) {
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", String.valueOf(i) + ".txt");
runner.enqueue(Paths.get("src/test/resources/hello.txt"), attrs);
@ -42,7 +58,7 @@ public class TestPutS3Object {
runner.setProperty(PutS3Object.BUCKET, "test-bucket-00000000-0000-0000-0000-123456789012");
runner.setProperty(PutS3Object.CREDENTAILS_FILE, CREDENTIALS_FILE);
runner.setProperty(PutS3Object.EXPIRATION_RULE_ID, "Expire Quickly");
Assert.assertTrue( runner.setProperty("x-custom-prop", "hello").isValid() );
Assert.assertTrue(runner.setProperty("x-custom-prop", "hello").isValid());
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "folder/1.txt");
@ -52,14 +68,13 @@ public class TestPutS3Object {
runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 1);
}
@Test
public void testStorageClass() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new PutS3Object());
runner.setProperty(PutS3Object.BUCKET, "test-bucket-00000000-0000-0000-0000-123456789012");
runner.setProperty(PutS3Object.CREDENTAILS_FILE, CREDENTIALS_FILE);
runner.setProperty(PutS3Object.STORAGE_CLASS, StorageClass.ReducedRedundancy.name());
Assert.assertTrue( runner.setProperty("x-custom-prop", "hello").isValid() );
Assert.assertTrue(runner.setProperty("x-custom-prop", "hello").isValid());
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "folder/2.txt");

View File

@ -1,3 +1,19 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.aws.sns;
import static org.junit.Assert.assertTrue;
@ -14,6 +30,7 @@ import org.junit.Test;
@Ignore("For local testing only - interacts with S3 so the credentials file must be configured and all necessary buckets created")
public class TestPutSNS {
private final String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties";
@Test
@ -21,7 +38,7 @@ public class TestPutSNS {
final TestRunner runner = TestRunners.newTestRunner(new PutSNS());
runner.setProperty(PutSNS.CREDENTAILS_FILE, CREDENTIALS_FILE);
runner.setProperty(PutSNS.ARN, "arn:aws:sns:us-west-2:100515378163:test-topic-1");
assertTrue( runner.setProperty("DynamicProperty", "hello!").isValid() );
assertTrue(runner.setProperty("DynamicProperty", "hello!").isValid());
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "1.txt");

View File

@ -1,3 +1,19 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.aws.sqs;
import java.util.List;
@ -11,6 +27,7 @@ import org.junit.Test;
@Ignore("For local testing only - interacts with S3 so the credentials file must be configured and all necessary buckets created")
public class TestGetSQS {
private final String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties";
@Test
@ -23,7 +40,7 @@ public class TestGetSQS {
runner.run(1);
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetSQS.REL_SUCCESS);
for ( final MockFlowFile mff : flowFiles ) {
for (final MockFlowFile mff : flowFiles) {
System.out.println(mff.getAttributes());
System.out.println(new String(mff.toByteArray()));
}

View File

@ -1,3 +1,19 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.aws.sqs;
import java.io.IOException;
@ -14,6 +30,7 @@ import org.junit.Test;
@Ignore("For local testing only - interacts with S3 so the credentials file must be configured and all necessary buckets created")
public class TestPutSQS {
private final String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties";
@Test
@ -22,7 +39,7 @@ public class TestPutSQS {
runner.setProperty(PutSNS.CREDENTAILS_FILE, CREDENTIALS_FILE);
runner.setProperty(PutSQS.TIMEOUT, "30 secs");
runner.setProperty(PutSQS.QUEUE_URL, "https://sqs.us-west-2.amazonaws.com/100515378163/test-queue-000000000");
Assert.assertTrue( runner.setProperty("x-custom-prop", "hello").isValid() );
Assert.assertTrue(runner.setProperty("x-custom-prop", "hello").isValid());
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "1.txt");

View File

@ -0,0 +1,68 @@
nifi-geo-nar
Copyright 2015 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
******************
Apache Software License v2
******************
The following binary components are provided under the Apache Software License v2
(ASLv2) Apache Commons Lang
The following NOTICE information applies:
Apache Commons Lang
Copyright 2001-2014 The Apache Software Foundation
This product includes software from the Spring Framework,
under the Apache License 2.0 (see: StringUtils.containsWhitespace())
(ASLv2) Apache HttpComponents
The following NOTICE information applies:
Apache HttpClient
Copyright 1999-2014 The Apache Software Foundation
Apache HttpCore
Copyright 2005-2014 The Apache Software Foundation
This project contains annotations derived from JCIP-ANNOTATIONS
Copyright (c) 2005 Brian Goetz and Tim Peierls. See http://www.jcip.net
(ASLv2) Apache Commons Codec
The following NOTICE information applies:
Apache Commons Codec
Copyright 2002-2014 The Apache Software Foundation
src/test/org/apache/commons/codec/language/DoubleMetaphoneTest.java
contains test data from http://aspell.net/test/orig/batch0.tab.
Copyright (C) 2002 Kevin Atkinson (kevina@gnu.org)
===============================================================================
The content of package org.apache.commons.codec.language.bm has been translated
from the original php source code available at http://stevemorse.org/phoneticinfo.htm
with permission from the original authors.
Original source copyright:
Copyright (c) 2008 Alexander Beider & Stephen P. Morse.
(ASLv2) Apache Commons Logging
The following NOTICE information applies:
Apache Commons Logging
Copyright 2003-2013 The Apache Software Foundation
(ASLv2) GeoIP2 Java API
The following NOTICE information applies:
GeoIP2 Java API
This software is Copyright (c) 2013 by MaxMind, Inc.
************************
Creative Commons Attribution-ShareAlike 3.0
************************
The following binary components are provided under the Creative Commons Attribution-ShareAlike 3.0. See project link for details.
(CCAS 3.0) MaxMind DB (https://github.com/maxmind/MaxMind-DB)

View File

@ -0,0 +1,29 @@
nifi-hl7-nar
Copyright 2015 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
******************
Apache Software License v2
******************
The following binary components are provided under the Apache Software License v2
(ASLv2) Apache Commons Lang
The following NOTICE information applies:
Apache Commons Lang
Copyright 2001-2014 The Apache Software Foundation
This product includes software from the Spring Framework,
under the Apache License 2.0 (see: StringUtils.containsWhitespace())
*****************
Mozilla Public License v1.1
*****************
The following binary components are provided under the Mozilla Public License v1.1. See project link for details.
(MPL 1.1) HAPI Base (ca.uhn.hapi:hapi-base:2.2 - http://hl7api.sourceforge.net/)
(MPL 1.1) HAPI Structures (ca.uhn.hapi:hapi-structures-v*:2.2 - http://hl7api.sourceforge.net/)

View File

@ -0,0 +1,57 @@
nifi-social-media-nar
Copyright 2015 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
******************
Apache Software License v2
******************
The following binary components are provided under the Apache Software License v2
(ASLv2) Apache Commons Lang
The following NOTICE information applies:
Apache Commons Lang
Copyright 2001-2014 The Apache Software Foundation
This product includes software from the Spring Framework,
under the Apache License 2.0 (see: StringUtils.containsWhitespace())
(ASLv2) Apache Commons Codec
The following NOTICE information applies:
Apache Commons Codec
Copyright 2002-2014 The Apache Software Foundation
src/test/org/apache/commons/codec/language/DoubleMetaphoneTest.java
contains test data from http://aspell.net/test/orig/batch0.tab.
Copyright (C) 2002 Kevin Atkinson (kevina@gnu.org)
===============================================================================
The content of package org.apache.commons.codec.language.bm has been translated
from the original php source code available at http://stevemorse.org/phoneticinfo.htm
with permission from the original authors.
Original source copyright:
Copyright (c) 2008 Alexander Beider & Stephen P. Morse.
(ASLv2) Apache Commons Logging
The following NOTICE information applies:
Apache Commons Logging
Copyright 2003-2013 The Apache Software Foundation
(ASLv2) Twitter4J
The following NOTICE information applies:
Copyright 2007 Yusuke Yamamoto
Twitter4J includes software from JSON.org to parse JSON response from the Twitter API. You can see the license term at http://www.JSON.org/license.html
(ASLv2) JOAuth
The following NOTICE information applies:
JOAuth
Copyright 2010-2013 Twitter, Inc
(ASLv2) Hosebird Client
The following NOTICE information applies:
Hosebird Client (hbc)
Copyright 2013 Twitter, Inc.