diff --git a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-nar/pom.xml b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-nar/pom.xml index 4c34715583..3e6f450181 100644 --- a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-nar/pom.xml +++ b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-nar/pom.xml @@ -19,18 +19,17 @@ org.apache.nifi nifi-aws-bundle - 0.0.2-incubating-SNAPSHOT + 0.1.0-incubating-SNAPSHOT nifi-aws-nar - 0.0.2-incubating-SNAPSHOT nar org.apache.nifi nifi-aws-processors - 0.0.2-incubating-SNAPSHOT + 0.1.0-incubating-SNAPSHOT diff --git a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml index 2cb1302ae4..227077344e 100644 --- a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml +++ b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml @@ -19,7 +19,7 @@ org.apache.nifi nifi-aws-bundle - 0.0.2-incubating-SNAPSHOT + 0.1.0-incubating-SNAPSHOT nifi-aws-processors diff --git a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java index dde0d1d69a..11c6a9d69f 100644 --- a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java +++ b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.nifi.processors.aws; import java.io.File; diff --git a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java index 4849357b00..624015b6a5 100644 --- a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java +++ b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.nifi.processors.aws.s3; import java.util.ArrayList; diff --git a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/GetS3Object.java b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java similarity index 65% rename from nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/GetS3Object.java rename to nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java index 6ba1bbfd20..63c834630c 100644 --- a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/GetS3Object.java +++ b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.nifi.processors.aws.s3; import java.io.IOException; @@ -8,10 +24,13 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.components.PropertyValue; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processor.ProcessContext; @@ -25,11 +44,26 @@ import com.amazonaws.services.s3.model.ObjectMetadata; import com.amazonaws.services.s3.model.S3Object; -@Tags({"Amazon", "S3", "AWS", "Get"}) +@SupportsBatching +@SeeAlso({PutS3Object.class}) +@Tags({"Amazon", "S3", "AWS", "Get", "Fetch"}) @CapabilityDescription("Retrieves the contents of an S3 Object and writes it to the content of a FlowFile") -public class GetS3Object extends AbstractS3Processor { +@WritesAttributes({ + @WritesAttribute(attribute="s3.bucket", description="The name of the S3 bucket"), + @WritesAttribute(attribute="path", description="The path of the file"), + @WritesAttribute(attribute="absolute.path", description="The path of the file"), + @WritesAttribute(attribute="filename", description="The name of the file"), + @WritesAttribute(attribute="hash.value", description="The MD5 sum of the file"), + @WritesAttribute(attribute="hash.algorithm", description="MD5"), + @WritesAttribute(attribute="mime.type", description="If S3 provides the content type/MIME type, this attribute will hold that file"), + @WritesAttribute(attribute="s3.etag", description="The ETag that can be used to see if the file has changed"), + @WritesAttribute(attribute="s3.expirationTime", description="If the file has an expiration date, this attribute will be set, containing the milliseconds since epoch in UTC time"), + @WritesAttribute(attribute="s3.expirationTimeRuleId", description="The ID of the rule that dictates this object's expiration time"), + @WritesAttribute(attribute="s3.version", description="The version of the S3 object"), +}) +public class FetchS3Object extends AbstractS3Processor { - public static final PropertyDescriptor VERSION_ID = new PropertyDescriptor.Builder() + public static final PropertyDescriptor VERSION_ID = new PropertyDescriptor.Builder() .name("Version") .description("The Version of the Object to download") .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) @@ -37,25 +71,8 @@ public class GetS3Object extends AbstractS3Processor { .required(false) .build(); - public static final PropertyDescriptor BYTE_RANGE_START = new PropertyDescriptor.Builder() - .name("First Byte Index") - .description("The 0-based index of the first byte to download. If specified, the first N bytes will be skipped, where N is the value of this property. If this value is greater than the size of the object, the FlowFile will be routed to failure.") - .required(false) - .expressionLanguageSupported(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - public static final PropertyDescriptor BYTE_RANGE_END = new PropertyDescriptor.Builder() - .name("Last Byte Index") - .description("The 0-based index of the last byte to download. If specified, last N bytes will be skipped, where N is the size of the object minus the value of this property. If the value is greater than the size of the object, the content will be downloaded to the end of the object.") - .required(false) - .expressionLanguageSupported(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - - public static final List properties = Collections.unmodifiableList( - Arrays.asList(BUCKET, KEY, REGION, ACCESS_KEY, SECRET_KEY, CREDENTAILS_FILE, TIMEOUT, VERSION_ID, - BYTE_RANGE_START, BYTE_RANGE_END) ); + Arrays.asList(BUCKET, KEY, REGION, ACCESS_KEY, SECRET_KEY, CREDENTAILS_FILE, TIMEOUT, VERSION_ID) ); @Override protected List getSupportedPropertyDescriptors() { @@ -82,30 +99,6 @@ public class GetS3Object extends AbstractS3Processor { request = new GetObjectRequest(bucket, key, versionId); } - final Long byteRangeStart; - final Long byteRangeEnd; - try { - final PropertyValue startVal = context.getProperty(BYTE_RANGE_START).evaluateAttributeExpressions(flowFile); - byteRangeStart = startVal.isSet() ? startVal.asLong() : 0L; - - final PropertyValue endVal = context.getProperty(BYTE_RANGE_END).evaluateAttributeExpressions(flowFile); - byteRangeEnd = endVal.isSet() ? endVal.asLong() : Long.MAX_VALUE; - } catch (final NumberFormatException nfe) { - getLogger().error("Failed to determine byte range for download for {} due to {}", new Object[] {flowFile, nfe}); - session.transfer(flowFile, REL_FAILURE); - return; - } - - if ( byteRangeStart != null && byteRangeEnd != null ) { - if ( byteRangeEnd.longValue() < byteRangeStart.longValue() ) { - getLogger().error("Failed to download object from S3 for {} because Start Byte Range is {} and End Byte Range is {}, which is less", new Object[] {flowFile, byteRangeStart, byteRangeEnd}); - session.transfer(flowFile, REL_FAILURE); - return; - } - - request.setRange(byteRangeStart.longValue(), byteRangeEnd.longValue()); - } - final Map attributes = new HashMap<>(); try (final S3Object s3Object = client.getObject(request)) { flowFile = session.importFrom(s3Object.getObjectContent(), flowFile); @@ -160,6 +153,5 @@ public class GetS3Object extends AbstractS3Processor { getLogger().info("Successfully retrieved S3 Object for {} in {} millis; routing to success", new Object[] {flowFile, transferMillis}); session.getProvenanceReporter().receive(flowFile, "http://" + bucket + ".amazonaws.com/" + key, transferMillis); } - } diff --git a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java index 1fe5db11f7..9a4fc5b995 100644 --- a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java +++ b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.nifi.processors.aws.s3; import java.io.BufferedInputStream; @@ -11,7 +27,13 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; @@ -30,11 +52,21 @@ import com.amazonaws.services.s3.model.PutObjectRequest; import com.amazonaws.services.s3.model.PutObjectResult; import com.amazonaws.services.s3.model.StorageClass; - +@SupportsBatching +@SeeAlso({FetchS3Object.class}) @Tags({"Amazon", "S3", "AWS", "Archive", "Put"}) @CapabilityDescription("Puts FlowFiles to an Amazon S3 Bucket") +@DynamicProperty(name="The name of a User-Defined Metadata field to add to the S3 Object", + value="The value of a User-Defined Metadata field to add to the S3 Object", + description="Allows user-defined metadata to be added to the S3 object as key/value pairs", + supportsExpressionLanguage=true) +@ReadsAttribute(attribute="filename", description="Uses the FlowFile's filename as the filename for the S3 object") +@WritesAttributes({ + @WritesAttribute(attribute="s3.version", description="The version of the S3 Object that was put to S3"), + @WritesAttribute(attribute="s3.etag", description="The ETag of the S3 Object"), + @WritesAttribute(attribute="s3.expiration", description="A human-readable form of the expiration date of the S3 object, if one is set") +}) public class PutS3Object extends AbstractS3Processor { - public static final PropertyDescriptor EXPIRATION_RULE_ID = new PropertyDescriptor.Builder() .name("Expiration Time Rule") .required(false) diff --git a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/AbstractSNSProcessor.java b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/AbstractSNSProcessor.java index d2404f36c9..5447169034 100644 --- a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/AbstractSNSProcessor.java +++ b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/AbstractSNSProcessor.java @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.nifi.processors.aws.sns; import org.apache.nifi.components.AllowableValue; diff --git a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java index bc690eb31b..1de3251ce8 100644 --- a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java +++ b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.nifi.processors.aws.sns; import java.io.ByteArrayOutputStream; @@ -7,24 +23,24 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.aws.sqs.GetSQS; +import org.apache.nifi.processors.aws.sqs.PutSQS; import com.amazonaws.services.sns.AmazonSNSClient; import com.amazonaws.services.sns.model.MessageAttributeValue; import com.amazonaws.services.sns.model.PublishRequest; - -// TODO: Allow user to choose 'content strategy' -// 1. Content from message body or attributes? If attributes, allow new property for configuring EL expression. Otherwise, no property. -// 2. Same content to all subscribers or different content per subscription type? -// If same, just use single property. -// If different, must use Attribute values for each and have a separate property for each type of subscription (HTTP, HTTPS, E-mail, SMS, etc.) +@SupportsBatching +@SeeAlso({GetSQS.class, PutSQS.class}) @Tags({"amazon", "aws", "sns", "topic", "put", "publish", "pubsub"}) @CapabilityDescription("Sends the content of a FlowFile as a notification to the Amazon Simple Notification Service") public class PutSNS extends AbstractSNSProcessor { diff --git a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/AbstractSQSProcessor.java b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/AbstractSQSProcessor.java index 72eec4c9d2..2ef749f6d9 100644 --- a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/AbstractSQSProcessor.java +++ b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/AbstractSQSProcessor.java @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.nifi.processors.aws.sqs; import org.apache.nifi.components.PropertyDescriptor; diff --git a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java index 3659a50eda..2416044af1 100644 --- a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java +++ b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.nifi.processors.aws.sqs; import java.util.ArrayList; @@ -5,7 +21,9 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; @@ -17,8 +35,9 @@ import com.amazonaws.services.sqs.AmazonSQSClient; import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest; import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry; - -@Tags({"Amazon", "SQS", "Queue", "Delete"}) +@SupportsBatching +@SeeAlso({GetSQS.class, PutSQS.class}) +@Tags({"Amazon", "AWS", "SQS", "Queue", "Delete"}) @CapabilityDescription("Deletes a message from an Amazon Simple Queuing Service Queue") public class DeleteSQS extends AbstractSQSProcessor { public static final PropertyDescriptor RECEIPT_HANDLE = new PropertyDescriptor.Builder() @@ -41,9 +60,6 @@ public class DeleteSQS extends AbstractSQSProcessor { @Override public void onTrigger(final ProcessContext context, final ProcessSession session) { - // TODO: Process batch of FlowFiles. To do this, we have to first poll the first - // FlowFile and then use session.get(FlowFileFilter) to get up to BATCH_SIZE - 1 - // more results that have the same Queue URL. List flowFiles = session.get(1); if ( flowFiles.isEmpty() ) { return; diff --git a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java index 1af37805e5..6c0257bf45 100644 --- a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java +++ b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.nifi.processors.aws.sqs; import java.io.IOException; @@ -12,7 +28,11 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; @@ -30,10 +50,17 @@ import com.amazonaws.services.sqs.model.MessageAttributeValue; import com.amazonaws.services.sqs.model.ReceiveMessageRequest; import com.amazonaws.services.sqs.model.ReceiveMessageResult; -@Tags({ "Amazon", "SQS", "Queue", "Get", "Fetch", "Poll"}) +@SupportsBatching +@Tags({ "Amazon", "AWS", "SQS", "Queue", "Get", "Fetch", "Poll"}) +@SeeAlso({PutSQS.class, DeleteSQS.class}) @CapabilityDescription("Fetches messages from an Amazon Simple Queuing Service Queue") +@WritesAttributes({ + @WritesAttribute(attribute="hash.value", description="The MD5 sum of the message"), + @WritesAttribute(attribute="hash.algorithm", description="MD5"), + @WritesAttribute(attribute="sqs.message.id", description="The unique identifier of the SQS message"), + @WritesAttribute(attribute="sqs.receipt.handle", description="The SQS Receipt Handle that is to be used to delete the message from the queue") +}) public class GetSQS extends AbstractSQSProcessor { - public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder() .name("Character Set") .description("The Character Set that should be used to encode the textual content of the SQS message") diff --git a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/PutSQS.java b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/PutSQS.java index cac4d73b65..81268fe2cd 100644 --- a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/PutSQS.java +++ b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/PutSQS.java @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.nifi.processors.aws.sqs; import java.io.ByteArrayOutputStream; @@ -11,7 +27,10 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.PropertyDescriptor; @@ -26,8 +45,13 @@ import com.amazonaws.services.sqs.model.SendMessageBatchRequest; import com.amazonaws.services.sqs.model.SendMessageBatchRequestEntry; -@Tags({"Amazon", "SQS", "Queue", "Put", "Publish"}) +@SupportsBatching +@Tags({"Amazon", "AWS", "SQS", "Queue", "Put", "Publish"}) +@SeeAlso({GetSQS.class, DeleteSQS.class}) @CapabilityDescription("Publishes a message to an Amazon Simple Queuing Service Queue") +@DynamicProperty(name="The name of a Message Attribute to add to the message", value="The value of the Message Attribute", + description="Allows the user to add key/value pairs as Message Attributes by adding a property whose name will become the name of " + + "the Message Attribute and value will become the value of the Message Attribute", supportsExpressionLanguage=true) public class PutSQS extends AbstractSQSProcessor { public static final PropertyDescriptor DELAY = new PropertyDescriptor.Builder() @@ -71,10 +95,6 @@ public class PutSQS extends AbstractSQSProcessor { @Override public void onTrigger(final ProcessContext context, final ProcessSession session) { - // TODO: Process batch of FlowFiles. To do this, we have to first poll the first - // FlowFile and then use session.get(FlowFileFilter) to get up to BATCH_SIZE - 1 - // more results that have the same Queue URL. - FlowFile flowFile = session.get(); if ( flowFile == null ) { return; diff --git a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 32a395a7ad..4f2405c91d 100644 --- a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -12,7 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -org.apache.nifi.processors.aws.s3.GetS3Object +org.apache.nifi.processors.aws.s3.FetchS3Object org.apache.nifi.processors.aws.s3.PutS3Object org.apache.nifi.processors.aws.sns.PutSNS org.apache.nifi.processors.aws.sqs.GetSQS diff --git a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestGetS3Object.java b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java similarity index 70% rename from nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestGetS3Object.java rename to nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java index 391c8b4299..40f951538b 100644 --- a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestGetS3Object.java +++ b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java @@ -14,18 +14,15 @@ import org.junit.Ignore; import org.junit.Test; @Ignore("For local testing only - interacts with S3 so the credentials file must be configured and all necessary buckets created") -public class TestGetS3Object { +public class TestFetchS3Object { private final String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties"; @Test public void testGet() throws IOException { - final TestRunner runner = TestRunners.newTestRunner(new GetS3Object()); - runner.setProperty(GetS3Object.BUCKET, "anonymous-test-bucket-00000000"); - runner.setProperty(GetS3Object.CREDENTAILS_FILE, CREDENTIALS_FILE); - runner.setProperty(GetS3Object.KEY, "folder/1.txt"); - - runner.setProperty(GetS3Object.BYTE_RANGE_START, "${start}"); - runner.setProperty(GetS3Object.BYTE_RANGE_END, String.valueOf(Long.MAX_VALUE)); + final TestRunner runner = TestRunners.newTestRunner(new FetchS3Object()); + runner.setProperty(FetchS3Object.BUCKET, "anonymous-test-bucket-00000000"); + runner.setProperty(FetchS3Object.CREDENTAILS_FILE, CREDENTIALS_FILE); + runner.setProperty(FetchS3Object.KEY, "folder/1.txt"); final Map attrs = new HashMap<>(); attrs.put("start", "0"); @@ -33,8 +30,8 @@ public class TestGetS3Object { runner.enqueue(new byte[0], attrs); runner.run(1); - runner.assertAllFlowFilesTransferred(GetS3Object.REL_SUCCESS, 1); - final List ffs = runner.getFlowFilesForRelationship(GetS3Object.REL_SUCCESS); + runner.assertAllFlowFilesTransferred(FetchS3Object.REL_SUCCESS, 1); + final List ffs = runner.getFlowFilesForRelationship(FetchS3Object.REL_SUCCESS); final MockFlowFile out = ffs.iterator().next(); final byte[] expectedBytes = Files.readAllBytes(Paths.get("src/test/resources/hello.txt")); diff --git a/nifi/nifi-nar-bundles/nifi-aws-bundle/pom.xml b/nifi/nifi-nar-bundles/nifi-aws-bundle/pom.xml index 81a54e4dc3..117d7dd1c4 100644 --- a/nifi/nifi-nar-bundles/nifi-aws-bundle/pom.xml +++ b/nifi/nifi-nar-bundles/nifi-aws-bundle/pom.xml @@ -19,7 +19,7 @@ org.apache.nifi nifi-nar-bundles - 0.0.2-incubating-SNAPSHOT + 0.1.0-incubating-SNAPSHOT nifi-aws-bundle