From 0334f04640c7ae0b070214df7e356b2b05a8051c Mon Sep 17 00:00:00 2001 From: Yu ISHIKAWA Date: Wed, 2 Sep 2015 13:03:05 +0900 Subject: [PATCH] Add a relation for "not found" and that is transfered if a target key doesn't exist on S3 --- .../processors/aws/AbstractAWSProcessor.java | 2 +- .../processors/aws/s3/DeleteS3Object.java | 32 +++++++++++++++---- .../processors/aws/s3/TestDeleteS3Object.java | 19 +++++++++-- 3 files changed, 43 insertions(+), 10 deletions(-) diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java index a781ff982a..e2ae31eb70 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java @@ -54,7 +54,7 @@ public abstract class AbstractAWSProcessor relationships = Collections.unmodifiableSet( + public static Set relationships = Collections.unmodifiableSet( new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE))); public static final PropertyDescriptor CREDENTAILS_FILE = new PropertyDescriptor.Builder() diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java index 2cc00dbe3d..803a6abf19 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java @@ -18,14 +18,15 @@ package org.apache.nifi.processors.aws.s3; import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.TimeUnit; import com.amazonaws.AmazonServiceException; import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.model.DeleteObjectRequest; import com.amazonaws.services.s3.model.DeleteVersionRequest; - import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; @@ -34,6 +35,7 @@ import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.util.StandardValidators; @@ -44,6 +46,9 @@ import org.apache.nifi.processor.util.StandardValidators; "And the FlowFiles are checked if exists or not before deleting.") public class DeleteS3Object extends AbstractS3Processor { + public static final Relationship REL_NOT_FOUND = new Relationship.Builder().name("not found") + .description("FlowFiles are routed to 'not found' if it doesn't exist on Amazon S3").build(); + public static final PropertyDescriptor VERSION_ID = new PropertyDescriptor.Builder() .name("Version") .description("The Version of the Object to delete") @@ -56,6 +61,14 @@ public class DeleteS3Object extends AbstractS3Processor { Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, CREDENTAILS_FILE, REGION, TIMEOUT, VERSION_ID, FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER)); + public static final Set relationships = Collections.unmodifiableSet( + new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE, REL_NOT_FOUND))); + + @Override + public Set getRelationships() { + return relationships; + } + @Override protected List getSupportedPropertyDescriptors() { return properties; @@ -75,12 +88,19 @@ public class DeleteS3Object extends AbstractS3Processor { final String versionId = context.getProperty(VERSION_ID).evaluateAttributeExpressions(flowFile).getValue(); final AmazonS3 s3 = getClient(); - try { - // Checks if the key exists or not - // If there is no such a key, then throws a exception - s3.getObjectMetadata(bucket, key); - // Deletes a key on Amazon S3 + // Checks if the key exists or not + // If there is no such a key, then throws a exception + try { + s3.getObjectMetadata(bucket, key); + } catch (final AmazonServiceException ase) { + getLogger().error("Not found sucha a file and folder on Amazon S3 {}", new Object[]{flowFile, ase}); + session.transfer(flowFile, REL_NOT_FOUND); + return; + } + + // Deletes a key on Amazon S3 + try { if (versionId == null) { final DeleteObjectRequest r = new DeleteObjectRequest(bucket, key); s3.deleteObject(r); diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestDeleteS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestDeleteS3Object.java index cac55e5ad8..04d9e61e32 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestDeleteS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestDeleteS3Object.java @@ -22,15 +22,21 @@ import java.io.IOException; import java.net.URL; import java.util.HashMap; import java.util.Map; +import java.util.Set; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; +import static org.junit.Assert.assertEquals; import com.amazonaws.auth.PropertiesCredentials; import com.amazonaws.services.s3.AmazonS3Client; -import com.amazonaws.services.s3.model.*; - +import com.amazonaws.services.s3.model.AmazonS3Exception; +import com.amazonaws.services.s3.model.CreateBucketRequest; +import com.amazonaws.services.s3.model.DeleteBucketRequest; +import com.amazonaws.services.s3.model.PutObjectRequest; +import com.amazonaws.services.s3.model.PutObjectResult; +import org.apache.nifi.processor.Relationship; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -122,7 +128,14 @@ public class TestDeleteS3Object { runner.enqueue(new byte[0], attrs); runner.run(1); - runner.assertAllFlowFilesTransferred(DeleteS3Object.REL_FAILURE, 1); + runner.assertAllFlowFilesTransferred(DeleteS3Object.REL_NOT_FOUND, 1); + } + + @Test + public void testGetRelationships() { + DeleteS3Object deleter = new DeleteS3Object(); + Set relationships = deleter.getRelationships(); + assertEquals(relationships.size(), 3); } // Uploads a test file