diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java new file mode 100644 index 0000000000..3be7a15bad --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.s3; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import com.amazonaws.AmazonServiceException; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.DeleteObjectRequest; +import com.amazonaws.services.s3.model.DeleteVersionRequest; + +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.util.StandardValidators; + + +@SupportsBatching +@SeeAlso({PutS3Object.class}) +@Tags({"Amazon", "S3", "AWS", "Archive", "Delete"}) +@CapabilityDescription("Deletes FlowFiles on an Amazon S3 Bucket. " + + "And the FlowFiles are checked if exists or not before deleting.") +public class DeleteS3Object extends AbstractS3Processor { + + public static final PropertyDescriptor VERSION_ID = new PropertyDescriptor.Builder() + .name("Version") + .description("The Version of the Object to delete") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .required(false) + .build(); + + public static final List properties = Collections.unmodifiableList( + Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, CREDENTAILS_FILE, REGION, TIMEOUT, VERSION_ID, + FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER)); + + protected List getSupportedPropertyDescriptors() { + return properties; + } + + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .name(propertyDescriptorName) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .dynamic(true) + .build(); + } + + public void onTrigger(final ProcessContext context, final ProcessSession session) { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final long startNanos = System.nanoTime(); + + final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions(flowFile).getValue(); + final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue(); + final String versionId = context.getProperty(VERSION_ID).evaluateAttributeExpressions(flowFile).getValue(); + + final AmazonS3 s3 = getClient(); + try { + // Checks if the key exists or not + // If there is no such a key, then throws a exception + s3.getObjectMetadata(bucket, key); + + // Deletes a key on Amazon S3 + if (versionId == null) { + final DeleteObjectRequest r = new DeleteObjectRequest(bucket, key); + s3.deleteObject(r); + } else { + final DeleteVersionRequest r = new DeleteVersionRequest(bucket, key, versionId); + s3.deleteVersion(r); + } + } catch (final AmazonServiceException ase) { + getLogger().error("Failed to delete S3 Object for {}; routing to failure", new Object[]{flowFile, ase}); + session.transfer(flowFile, REL_FAILURE); + return; + } + + session.transfer(flowFile, REL_SUCCESS); + final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); + getLogger().info("Successfully delete S3 Object for {} in {} millis; routing to success", new Object[]{flowFile, transferMillis}); + } +} diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 4f2405c91d..d0d1e735bd 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -14,6 +14,7 @@ # limitations under the License. org.apache.nifi.processors.aws.s3.FetchS3Object org.apache.nifi.processors.aws.s3.PutS3Object +org.apache.nifi.processors.aws.s3.DeleteS3Object org.apache.nifi.processors.aws.sns.PutSNS org.apache.nifi.processors.aws.sqs.GetSQS org.apache.nifi.processors.aws.sqs.PutSQS diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestDeleteS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestDeleteS3Object.java new file mode 100644 index 0000000000..dfe6edb5d4 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestDeleteS3Object.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.s3; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.net.URL; +import java.util.HashMap; +import java.util.Map; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; + +import com.amazonaws.auth.PropertiesCredentials; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.*; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; + + +@Ignore("For local testing only - interacts with S3 so the credentials file must be configured and all necessary buckets created") +public class TestDeleteS3Object { + + private static final String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties"; + + // When you want to test this, you should create a bucket on Amazon S3 as follows. + private static final String TEST_REGION = "ap-northeast-1"; + private static final String TEST_BUCKET = "test-bucket-00000000-0000-0000-0000-1234567890123"; + + @BeforeClass + public static void oneTimeSetUp() { + // Creates a new bucket for this test + try { + PropertiesCredentials credentials = new PropertiesCredentials(new FileInputStream(CREDENTIALS_FILE)); + AmazonS3Client client = new AmazonS3Client(credentials); + CreateBucketRequest request = new CreateBucketRequest(TEST_BUCKET, TEST_REGION); + client.createBucket(request); + } catch (final AmazonS3Exception e) { + System.out.println(TEST_BUCKET + " already exists."); + } catch (final IOException e) { + System.out.println(CREDENTIALS_FILE + " doesn't exist."); + } + } + + @AfterClass + public static void oneTimeTearDown() throws IOException { + // Delete a bucket for this test + PropertiesCredentials credentials = new PropertiesCredentials(new FileInputStream(CREDENTIALS_FILE)); + AmazonS3Client client = new AmazonS3Client(credentials); + DeleteBucketRequest dbr = new DeleteBucketRequest(TEST_BUCKET); + client.deleteBucket(dbr); + } + + @Test + public void testSimpleDelete() throws IOException { + // Prepares for this test + uploadTestFile("hello.txt"); + + DeleteS3Object deleter = new DeleteS3Object(); + final TestRunner runner = TestRunners.newTestRunner(deleter); + runner.setProperty(DeleteS3Object.CREDENTAILS_FILE, CREDENTIALS_FILE); + runner.setProperty(DeleteS3Object.REGION, TEST_REGION); + runner.setProperty(DeleteS3Object.BUCKET, TEST_BUCKET); + runner.setProperty(DeleteS3Object.KEY, "hello.txt"); + + final Map attrs = new HashMap<>(); + attrs.put("filename", "hello.txt"); + runner.enqueue(new byte[0], attrs); + runner.run(1); + + runner.assertAllFlowFilesTransferred(DeleteS3Object.REL_SUCCESS, 1); + } + + @Test + public void testDeleteFolder() throws IOException { + // Prepares for this test + uploadTestFile("folder/1.txt"); + + DeleteS3Object deleter = new DeleteS3Object(); + final TestRunner runner = TestRunners.newTestRunner(deleter); + runner.setProperty(DeleteS3Object.CREDENTAILS_FILE, CREDENTIALS_FILE); + runner.setProperty(DeleteS3Object.REGION, TEST_REGION); + runner.setProperty(DeleteS3Object.BUCKET, TEST_BUCKET); + runner.setProperty(DeleteS3Object.KEY, "folder/1.txt"); + + final Map attrs = new HashMap<>(); + attrs.put("filename", "hello.txt"); + runner.enqueue(new byte[0], attrs); + runner.run(1); + + runner.assertAllFlowFilesTransferred(DeleteS3Object.REL_SUCCESS, 1); + } + + @Test + public void testTryToDeleteNotExistingFile() throws IOException { + DeleteS3Object deleter = new DeleteS3Object(); + final TestRunner runner = TestRunners.newTestRunner(deleter); + runner.setProperty(DeleteS3Object.CREDENTAILS_FILE, CREDENTIALS_FILE); + runner.setProperty(DeleteS3Object.REGION, TEST_REGION); + runner.setProperty(DeleteS3Object.BUCKET, TEST_BUCKET); + runner.setProperty(DeleteS3Object.BUCKET, "no-such-a-key"); + + final Map attrs = new HashMap<>(); + attrs.put("filename", "no-such-a-file"); + runner.enqueue(new byte[0], attrs); + runner.run(1); + + runner.assertAllFlowFilesTransferred(DeleteS3Object.REL_FAILURE, 1); + } + + // Uploads a test file + private void uploadTestFile(String key) throws IOException { + PropertiesCredentials credentials = new PropertiesCredentials(new FileInputStream(CREDENTIALS_FILE)); + AmazonS3Client client = new AmazonS3Client(credentials); + URL fileURL = this.getClass().getClassLoader().getResource("hello.txt"); + File file = new File(fileURL.getPath()); + PutObjectRequest putRequest = new PutObjectRequest(TEST_BUCKET, key, file); + PutObjectResult result = client.putObject(putRequest); + } +}