mirror of https://github.com/apache/nifi.git
Add a relation for "not found" and that is transfered if a target key doesn't exist on S3
This commit is contained in:
parent
213f507f53
commit
0334f04640
|
@ -54,7 +54,7 @@ public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceCl
|
||||||
public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
|
public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
|
||||||
.description("FlowFiles are routed to failure if unable to be copied to Amazon S3").build();
|
.description("FlowFiles are routed to failure if unable to be copied to Amazon S3").build();
|
||||||
|
|
||||||
public static final Set<Relationship> relationships = Collections.unmodifiableSet(
|
public static Set<Relationship> relationships = Collections.unmodifiableSet(
|
||||||
new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)));
|
new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)));
|
||||||
|
|
||||||
public static final PropertyDescriptor CREDENTAILS_FILE = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor CREDENTAILS_FILE = new PropertyDescriptor.Builder()
|
||||||
|
|
|
@ -18,14 +18,15 @@ package org.apache.nifi.processors.aws.s3;
|
||||||
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Set;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import com.amazonaws.AmazonServiceException;
|
import com.amazonaws.AmazonServiceException;
|
||||||
import com.amazonaws.services.s3.AmazonS3;
|
import com.amazonaws.services.s3.AmazonS3;
|
||||||
import com.amazonaws.services.s3.model.DeleteObjectRequest;
|
import com.amazonaws.services.s3.model.DeleteObjectRequest;
|
||||||
import com.amazonaws.services.s3.model.DeleteVersionRequest;
|
import com.amazonaws.services.s3.model.DeleteVersionRequest;
|
||||||
|
|
||||||
import org.apache.nifi.annotation.behavior.SupportsBatching;
|
import org.apache.nifi.annotation.behavior.SupportsBatching;
|
||||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||||
import org.apache.nifi.annotation.documentation.SeeAlso;
|
import org.apache.nifi.annotation.documentation.SeeAlso;
|
||||||
|
@ -34,6 +35,7 @@ import org.apache.nifi.components.PropertyDescriptor;
|
||||||
import org.apache.nifi.flowfile.FlowFile;
|
import org.apache.nifi.flowfile.FlowFile;
|
||||||
import org.apache.nifi.processor.ProcessContext;
|
import org.apache.nifi.processor.ProcessContext;
|
||||||
import org.apache.nifi.processor.ProcessSession;
|
import org.apache.nifi.processor.ProcessSession;
|
||||||
|
import org.apache.nifi.processor.Relationship;
|
||||||
import org.apache.nifi.processor.util.StandardValidators;
|
import org.apache.nifi.processor.util.StandardValidators;
|
||||||
|
|
||||||
|
|
||||||
|
@ -44,6 +46,9 @@ import org.apache.nifi.processor.util.StandardValidators;
|
||||||
"And the FlowFiles are checked if exists or not before deleting.")
|
"And the FlowFiles are checked if exists or not before deleting.")
|
||||||
public class DeleteS3Object extends AbstractS3Processor {
|
public class DeleteS3Object extends AbstractS3Processor {
|
||||||
|
|
||||||
|
public static final Relationship REL_NOT_FOUND = new Relationship.Builder().name("not found")
|
||||||
|
.description("FlowFiles are routed to 'not found' if it doesn't exist on Amazon S3").build();
|
||||||
|
|
||||||
public static final PropertyDescriptor VERSION_ID = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor VERSION_ID = new PropertyDescriptor.Builder()
|
||||||
.name("Version")
|
.name("Version")
|
||||||
.description("The Version of the Object to delete")
|
.description("The Version of the Object to delete")
|
||||||
|
@ -56,6 +61,14 @@ public class DeleteS3Object extends AbstractS3Processor {
|
||||||
Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, CREDENTAILS_FILE, REGION, TIMEOUT, VERSION_ID,
|
Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, CREDENTAILS_FILE, REGION, TIMEOUT, VERSION_ID,
|
||||||
FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER));
|
FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER));
|
||||||
|
|
||||||
|
public static final Set<Relationship> relationships = Collections.unmodifiableSet(
|
||||||
|
new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE, REL_NOT_FOUND)));
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Set<Relationship> getRelationships() {
|
||||||
|
return relationships;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||||
return properties;
|
return properties;
|
||||||
|
@ -75,12 +88,19 @@ public class DeleteS3Object extends AbstractS3Processor {
|
||||||
final String versionId = context.getProperty(VERSION_ID).evaluateAttributeExpressions(flowFile).getValue();
|
final String versionId = context.getProperty(VERSION_ID).evaluateAttributeExpressions(flowFile).getValue();
|
||||||
|
|
||||||
final AmazonS3 s3 = getClient();
|
final AmazonS3 s3 = getClient();
|
||||||
try {
|
|
||||||
// Checks if the key exists or not
|
|
||||||
// If there is no such a key, then throws a exception
|
|
||||||
s3.getObjectMetadata(bucket, key);
|
|
||||||
|
|
||||||
// Deletes a key on Amazon S3
|
// Checks if the key exists or not
|
||||||
|
// If there is no such a key, then throws a exception
|
||||||
|
try {
|
||||||
|
s3.getObjectMetadata(bucket, key);
|
||||||
|
} catch (final AmazonServiceException ase) {
|
||||||
|
getLogger().error("Not found sucha a file and folder on Amazon S3 {}", new Object[]{flowFile, ase});
|
||||||
|
session.transfer(flowFile, REL_NOT_FOUND);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Deletes a key on Amazon S3
|
||||||
|
try {
|
||||||
if (versionId == null) {
|
if (versionId == null) {
|
||||||
final DeleteObjectRequest r = new DeleteObjectRequest(bucket, key);
|
final DeleteObjectRequest r = new DeleteObjectRequest(bucket, key);
|
||||||
s3.deleteObject(r);
|
s3.deleteObject(r);
|
||||||
|
|
|
@ -22,15 +22,21 @@ import java.io.IOException;
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
import com.amazonaws.auth.PropertiesCredentials;
|
import com.amazonaws.auth.PropertiesCredentials;
|
||||||
import com.amazonaws.services.s3.AmazonS3Client;
|
import com.amazonaws.services.s3.AmazonS3Client;
|
||||||
import com.amazonaws.services.s3.model.*;
|
import com.amazonaws.services.s3.model.AmazonS3Exception;
|
||||||
|
import com.amazonaws.services.s3.model.CreateBucketRequest;
|
||||||
|
import com.amazonaws.services.s3.model.DeleteBucketRequest;
|
||||||
|
import com.amazonaws.services.s3.model.PutObjectRequest;
|
||||||
|
import com.amazonaws.services.s3.model.PutObjectResult;
|
||||||
|
import org.apache.nifi.processor.Relationship;
|
||||||
import org.apache.nifi.util.TestRunner;
|
import org.apache.nifi.util.TestRunner;
|
||||||
import org.apache.nifi.util.TestRunners;
|
import org.apache.nifi.util.TestRunners;
|
||||||
|
|
||||||
|
@ -122,7 +128,14 @@ public class TestDeleteS3Object {
|
||||||
runner.enqueue(new byte[0], attrs);
|
runner.enqueue(new byte[0], attrs);
|
||||||
runner.run(1);
|
runner.run(1);
|
||||||
|
|
||||||
runner.assertAllFlowFilesTransferred(DeleteS3Object.REL_FAILURE, 1);
|
runner.assertAllFlowFilesTransferred(DeleteS3Object.REL_NOT_FOUND, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetRelationships() {
|
||||||
|
DeleteS3Object deleter = new DeleteS3Object();
|
||||||
|
Set<Relationship> relationships = deleter.getRelationships();
|
||||||
|
assertEquals(relationships.size(), 3);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Uploads a test file
|
// Uploads a test file
|
||||||
|
|
Loading…
Reference in New Issue