Merge remote-tracking branch 'yu/NIFI-774'

This commit is contained in:
danbress 2015-10-12 09:55:52 -04:00
commit fad81d872d
4 changed files with 240 additions and 1 deletions

View File

@ -54,7 +54,7 @@ public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceCl
public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
.description("FlowFiles are routed to failure if unable to be copied to Amazon S3").build();
public static final Set<Relationship> relationships = Collections.unmodifiableSet(
public static Set<Relationship> relationships = Collections.unmodifiableSet(
new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)));
public static final PropertyDescriptor CREDENTAILS_FILE = new PropertyDescriptor.Builder()

View File

@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.aws.s3;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.DeleteObjectRequest;
import com.amazonaws.services.s3.model.DeleteVersionRequest;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.util.StandardValidators;
@SupportsBatching
@SeeAlso({PutS3Object.class})
@Tags({"Amazon", "S3", "AWS", "Archive", "Delete"})
@CapabilityDescription("Deletes FlowFiles on an Amazon S3 Bucket. " +
"And the FlowFiles are checked if exists or not before deleting.")
public class DeleteS3Object extends AbstractS3Processor {
public static final PropertyDescriptor VERSION_ID = new PropertyDescriptor.Builder()
.name("Version")
.description("The Version of the Object to delete")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.required(false)
.build();
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, CREDENTAILS_FILE, REGION, TIMEOUT, VERSION_ID,
FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER));
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final long startNanos = System.nanoTime();
final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions(flowFile).getValue();
final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
final String versionId = context.getProperty(VERSION_ID).evaluateAttributeExpressions(flowFile).getValue();
final AmazonS3 s3 = getClient();
// Deletes a key on Amazon S3
try {
if (versionId == null) {
final DeleteObjectRequest r = new DeleteObjectRequest(bucket, key);
s3.deleteObject(r);
} else {
final DeleteVersionRequest r = new DeleteVersionRequest(bucket, key, versionId);
s3.deleteVersion(r);
}
} catch (final AmazonServiceException ase) {
getLogger().error("Failed to delete S3 Object for {}; routing to failure", new Object[]{flowFile, ase});
session.transfer(flowFile, REL_FAILURE);
return;
}
session.transfer(flowFile, REL_SUCCESS);
final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
getLogger().info("Successfully delete S3 Object for {} in {} millis; routing to success", new Object[]{flowFile, transferMillis});
}
}

View File

@ -14,6 +14,7 @@
# limitations under the License.
org.apache.nifi.processors.aws.s3.FetchS3Object
org.apache.nifi.processors.aws.s3.PutS3Object
org.apache.nifi.processors.aws.s3.DeleteS3Object
org.apache.nifi.processors.aws.sns.PutSNS
org.apache.nifi.processors.aws.sqs.GetSQS
org.apache.nifi.processors.aws.sqs.PutSQS

View File

@ -0,0 +1,140 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.aws.s3;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.URL;
import java.util.HashMap;
import java.util.Map;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import com.amazonaws.auth.PropertiesCredentials;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.CreateBucketRequest;
import com.amazonaws.services.s3.model.DeleteBucketRequest;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.PutObjectResult;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
//@Ignore("For local testing only - interacts with S3 so the credentials file must be configured and all necessary buckets created")
public class TestDeleteS3Object {
private static final String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties";
// When you want to test this, you should create a bucket on Amazon S3 as follows.
private static final String TEST_REGION = "ap-northeast-1";
private static final String TEST_BUCKET = "test-bucket-00000000-0000-0000-0000-1234567890123";
@BeforeClass
public static void oneTimeSetUp() {
// Creates a new bucket for this test
try {
PropertiesCredentials credentials = new PropertiesCredentials(new FileInputStream(CREDENTIALS_FILE));
AmazonS3Client client = new AmazonS3Client(credentials);
CreateBucketRequest request = new CreateBucketRequest(TEST_BUCKET, TEST_REGION);
client.createBucket(request);
} catch (final AmazonS3Exception e) {
System.out.println("Can't create the key " + TEST_BUCKET + ":" + e.toString());
} catch (final IOException e) {
System.out.println(CREDENTIALS_FILE + " doesn't exist.");
}
}
@AfterClass
public static void oneTimeTearDown() throws IOException {
// Delete a bucket for this test
PropertiesCredentials credentials = new PropertiesCredentials(new FileInputStream(CREDENTIALS_FILE));
AmazonS3Client client = new AmazonS3Client(credentials);
DeleteBucketRequest dbr = new DeleteBucketRequest(TEST_BUCKET);
client.deleteBucket(dbr);
}
@Test
public void testSimpleDelete() throws IOException {
// Prepares for this test
uploadTestFile("hello.txt");
DeleteS3Object deleter = new DeleteS3Object();
final TestRunner runner = TestRunners.newTestRunner(deleter);
runner.setProperty(DeleteS3Object.CREDENTAILS_FILE, CREDENTIALS_FILE);
runner.setProperty(DeleteS3Object.REGION, TEST_REGION);
runner.setProperty(DeleteS3Object.BUCKET, TEST_BUCKET);
runner.setProperty(DeleteS3Object.KEY, "hello.txt");
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "hello.txt");
runner.enqueue(new byte[0], attrs);
runner.run(1);
runner.assertAllFlowFilesTransferred(DeleteS3Object.REL_SUCCESS, 1);
}
@Test
public void testDeleteFolder() throws IOException {
// Prepares for this test
uploadTestFile("folder/1.txt");
DeleteS3Object deleter = new DeleteS3Object();
final TestRunner runner = TestRunners.newTestRunner(deleter);
runner.setProperty(DeleteS3Object.CREDENTAILS_FILE, CREDENTIALS_FILE);
runner.setProperty(DeleteS3Object.REGION, TEST_REGION);
runner.setProperty(DeleteS3Object.BUCKET, TEST_BUCKET);
runner.setProperty(DeleteS3Object.KEY, "folder/1.txt");
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "hello.txt");
runner.enqueue(new byte[0], attrs);
runner.run(1);
runner.assertAllFlowFilesTransferred(DeleteS3Object.REL_SUCCESS, 1);
}
@Test
public void testTryToDeleteNotExistingFile() throws IOException {
DeleteS3Object deleter = new DeleteS3Object();
final TestRunner runner = TestRunners.newTestRunner(deleter);
runner.setProperty(DeleteS3Object.CREDENTAILS_FILE, CREDENTIALS_FILE);
runner.setProperty(DeleteS3Object.REGION, TEST_REGION);
runner.setProperty(DeleteS3Object.BUCKET, TEST_BUCKET);
runner.setProperty(DeleteS3Object.BUCKET, "no-such-a-key");
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "no-such-a-file");
runner.enqueue(new byte[0], attrs);
runner.run(1);
runner.assertAllFlowFilesTransferred(DeleteS3Object.REL_FAILURE, 1);
}
// Uploads a test file
private void uploadTestFile(String key) throws IOException {
PropertiesCredentials credentials = new PropertiesCredentials(new FileInputStream(CREDENTIALS_FILE));
AmazonS3Client client = new AmazonS3Client(credentials);
URL fileURL = this.getClass().getClassLoader().getResource("hello.txt");
File file = new File(fileURL.getPath());
PutObjectRequest putRequest = new PutObjectRequest(TEST_BUCKET, key, file);
PutObjectResult result = client.putObject(putRequest);
}
}