NIFI-965 penalized failed flowfiles in S3 Processors. Also discovered a typo in a static final that I corrected

Signed-off-by: Mike Moser <mosermw@apache.org>
This commit is contained in:
Tony Kurc 2015-11-07 23:27:18 -05:00 committed by Mike Moser
parent cf7bfe9e1c
commit 6ed5f28f26
14 changed files with 33 additions and 24 deletions

View File

@ -58,7 +58,7 @@ public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceCl
public static final Set<Relationship> relationships = Collections.unmodifiableSet(
new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)));
public static final PropertyDescriptor CREDENTAILS_FILE = new PropertyDescriptor.Builder()
public static final PropertyDescriptor CREDENTIALS_FILE = new PropertyDescriptor.Builder()
.name("Credentials File")
.expressionLanguageSupported(false)
.required(false)
@ -127,7 +127,7 @@ public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceCl
problems.add(new ValidationResult.Builder().input("Access Key").valid(false).explanation("If setting Secret Key or Access Key, must set both").build());
}
final boolean credentialsFileSet = validationContext.getProperty(CREDENTAILS_FILE).isSet();
final boolean credentialsFileSet = validationContext.getProperty(CREDENTIALS_FILE).isSet();
if ((secretKeySet || accessKeySet) && credentialsFileSet) {
problems.add(new ValidationResult.Builder().input("Access Key").valid(false).explanation("Cannot set both Credentials File and Secret Key/Access Key").build());
}
@ -180,7 +180,7 @@ public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceCl
final String accessKey = context.getProperty(ACCESS_KEY).getValue();
final String secretKey = context.getProperty(SECRET_KEY).getValue();
final String credentialsFile = context.getProperty(CREDENTAILS_FILE).getValue();
final String credentialsFile = context.getProperty(CREDENTIALS_FILE).getValue();
if (credentialsFile != null) {
try {

View File

@ -53,7 +53,7 @@ public class DeleteS3Object extends AbstractS3Processor {
.build();
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, CREDENTAILS_FILE, REGION, TIMEOUT, VERSION_ID,
Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, REGION, TIMEOUT, VERSION_ID,
FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER));
@Override
@ -88,6 +88,7 @@ public class DeleteS3Object extends AbstractS3Processor {
}
} catch (final AmazonServiceException ase) {
getLogger().error("Failed to delete S3 Object for {}; routing to failure", new Object[]{flowFile, ase});
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
return;
}

View File

@ -73,7 +73,7 @@ public class FetchS3Object extends AbstractS3Processor {
.build();
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
Arrays.asList(BUCKET, KEY, REGION, ACCESS_KEY, SECRET_KEY, CREDENTAILS_FILE, TIMEOUT, VERSION_ID));
Arrays.asList(BUCKET, KEY, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, TIMEOUT, VERSION_ID));
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@ -141,6 +141,7 @@ public class FetchS3Object extends AbstractS3Processor {
}
} catch (final IOException | AmazonClientException ioe) {
getLogger().error("Failed to retrieve S3 Object for {}; routing to failure", new Object[]{flowFile, ioe});
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
return;
}

View File

@ -84,7 +84,7 @@ public class PutS3Object extends AbstractS3Processor {
.build();
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, CREDENTAILS_FILE, STORAGE_CLASS, REGION, TIMEOUT, EXPIRATION_RULE_ID,
Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, STORAGE_CLASS, REGION, TIMEOUT, EXPIRATION_RULE_ID,
FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER));
@Override
@ -177,6 +177,7 @@ public class PutS3Object extends AbstractS3Processor {
getLogger().info("Successfully put {} to Amazon S3 in {} milliseconds", new Object[] {ff, millis});
} catch (final ProcessException | AmazonClientException pe) {
getLogger().error("Failed to put {} to Amazon S3 due to {}", new Object[] {flowFile, pe});
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
}
}

View File

@ -74,7 +74,7 @@ public class PutSNS extends AbstractSNSProcessor {
.build();
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
Arrays.asList(ARN, ARN_TYPE, SUBJECT, REGION, ACCESS_KEY, SECRET_KEY, CREDENTAILS_FILE, TIMEOUT,
Arrays.asList(ARN, ARN_TYPE, SUBJECT, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, TIMEOUT,
USE_JSON_STRUCTURE, CHARACTER_ENCODING));
public static final int MAX_SIZE = 256 * 1024;
@ -151,6 +151,7 @@ public class PutSNS extends AbstractSNSProcessor {
getLogger().info("Successfully published notification for {}", new Object[]{flowFile});
} catch (final Exception e) {
getLogger().error("Failed to publish Amazon SNS message for {} due to {}", new Object[]{flowFile, e});
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
}
}

View File

@ -91,7 +91,11 @@ public class DeleteSQS extends AbstractSQSProcessor {
session.transfer(flowFiles, REL_SUCCESS);
} catch (final Exception e) {
getLogger().error("Failed to delete {} objects from SQS due to {}", new Object[]{flowFiles.size(), e});
session.transfer(flowFiles, REL_FAILURE);
final List<FlowFile> penalizedFlowFiles = new ArrayList<>();
for (final FlowFile flowFile : flowFiles) {
penalizedFlowFiles.add(session.penalize(flowFile));
}
session.transfer(penalizedFlowFiles, REL_FAILURE);
}
}

View File

@ -104,7 +104,7 @@ public class GetSQS extends AbstractSQSProcessor {
.build();
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
Arrays.asList(STATIC_QUEUE_URL, AUTO_DELETE, ACCESS_KEY, SECRET_KEY, CREDENTAILS_FILE, REGION, BATCH_SIZE, TIMEOUT, CHARSET, VISIBILITY_TIMEOUT));
Arrays.asList(STATIC_QUEUE_URL, AUTO_DELETE, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, REGION, BATCH_SIZE, TIMEOUT, CHARSET, VISIBILITY_TIMEOUT));
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {

View File

@ -65,7 +65,7 @@ public class PutSQS extends AbstractSQSProcessor {
.build();
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
Arrays.asList(QUEUE_URL, ACCESS_KEY, SECRET_KEY, CREDENTAILS_FILE, REGION, DELAY, TIMEOUT));
Arrays.asList(QUEUE_URL, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, REGION, DELAY, TIMEOUT));
private volatile List<PropertyDescriptor> userDefinedProperties = Collections.emptyList();
@ -136,6 +136,7 @@ public class PutSQS extends AbstractSQSProcessor {
client.sendMessageBatch(request);
} catch (final Exception e) {
getLogger().error("Failed to send messages to Amazon SQS due to {}; routing to failure", new Object[]{e});
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
return;
}

View File

@ -35,7 +35,7 @@ public class TestDeleteS3Object extends AbstractS3Test {
final TestRunner runner = TestRunners.newTestRunner(new DeleteS3Object());
runner.setProperty(DeleteS3Object.CREDENTAILS_FILE, CREDENTIALS_FILE);
runner.setProperty(DeleteS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
runner.setProperty(DeleteS3Object.REGION, REGION);
runner.setProperty(DeleteS3Object.BUCKET, BUCKET_NAME);
@ -55,7 +55,7 @@ public class TestDeleteS3Object extends AbstractS3Test {
final TestRunner runner = TestRunners.newTestRunner(new DeleteS3Object());
runner.setProperty(DeleteS3Object.CREDENTAILS_FILE, CREDENTIALS_FILE);
runner.setProperty(DeleteS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
runner.setProperty(DeleteS3Object.REGION, REGION);
runner.setProperty(DeleteS3Object.BUCKET, BUCKET_NAME);
@ -75,7 +75,7 @@ public class TestDeleteS3Object extends AbstractS3Test {
final TestRunner runner = TestRunners.newTestRunner(new DeleteS3Object());
runner.setProperty(DeleteS3Object.CREDENTAILS_FILE, CREDENTIALS_FILE);
runner.setProperty(DeleteS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
runner.setProperty(DeleteS3Object.REGION, REGION);
runner.setProperty(DeleteS3Object.BUCKET, BUCKET_NAME);
runner.setProperty(DeleteS3Object.KEY, "folder/delete-me");
@ -93,7 +93,7 @@ public class TestDeleteS3Object extends AbstractS3Test {
public void testTryToDeleteNotExistingFile() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new DeleteS3Object());
runner.setProperty(DeleteS3Object.CREDENTAILS_FILE, CREDENTIALS_FILE);
runner.setProperty(DeleteS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
runner.setProperty(DeleteS3Object.REGION, REGION);
runner.setProperty(DeleteS3Object.BUCKET, BUCKET_NAME);

View File

@ -36,7 +36,7 @@ public class TestFetchS3Object extends AbstractS3Test {
final TestRunner runner = TestRunners.newTestRunner(new FetchS3Object());
runner.setProperty(FetchS3Object.CREDENTAILS_FILE, CREDENTIALS_FILE);
runner.setProperty(FetchS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
runner.setProperty(FetchS3Object.REGION, REGION);
runner.setProperty(FetchS3Object.BUCKET, BUCKET_NAME);
@ -53,7 +53,7 @@ public class TestFetchS3Object extends AbstractS3Test {
public void testTryToFetchNotExistingFile() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new FetchS3Object());
runner.setProperty(FetchS3Object.CREDENTAILS_FILE, CREDENTIALS_FILE);
runner.setProperty(FetchS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
runner.setProperty(FetchS3Object.REGION, REGION);
runner.setProperty(FetchS3Object.BUCKET, BUCKET_NAME);
@ -73,7 +73,7 @@ public class TestFetchS3Object extends AbstractS3Test {
final TestRunner runner = TestRunners.newTestRunner(new FetchS3Object());
runner.setProperty(FetchS3Object.CREDENTAILS_FILE, CREDENTIALS_FILE);
runner.setProperty(FetchS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
runner.setProperty(FetchS3Object.REGION, REGION);
runner.setProperty(FetchS3Object.BUCKET, BUCKET_NAME);

View File

@ -35,7 +35,7 @@ public class TestPutS3Object extends AbstractS3Test {
public void testSimplePut() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new PutS3Object());
runner.setProperty(PutS3Object.CREDENTAILS_FILE, CREDENTIALS_FILE);
runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
runner.setProperty(PutS3Object.REGION, REGION);
runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
@ -55,7 +55,7 @@ public class TestPutS3Object extends AbstractS3Test {
public void testPutInFolder() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new PutS3Object());
runner.setProperty(PutS3Object.CREDENTAILS_FILE, CREDENTIALS_FILE);
runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
runner.setProperty(PutS3Object.REGION, REGION);
runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
@ -74,7 +74,7 @@ public class TestPutS3Object extends AbstractS3Test {
public void testStorageClass() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new PutS3Object());
runner.setProperty(PutS3Object.CREDENTAILS_FILE, CREDENTIALS_FILE);
runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
runner.setProperty(PutS3Object.REGION, REGION);
runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
runner.setProperty(PutS3Object.STORAGE_CLASS, StorageClass.ReducedRedundancy.name());
@ -94,7 +94,7 @@ public class TestPutS3Object extends AbstractS3Test {
public void testPermissions() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new PutS3Object());
runner.setProperty(PutS3Object.CREDENTAILS_FILE, CREDENTIALS_FILE);
runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
runner.setProperty(PutS3Object.FULL_CONTROL_USER_LIST,"28545acd76c35c7e91f8409b95fd1aa0c0914bfa1ac60975d9f48bc3c5e090b5");
runner.setProperty(PutS3Object.REGION, REGION);

View File

@ -36,7 +36,7 @@ public class TestPutSNS {
@Test
public void testPublish() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new PutSNS());
runner.setProperty(PutSNS.CREDENTAILS_FILE, CREDENTIALS_FILE);
runner.setProperty(PutSNS.CREDENTIALS_FILE, CREDENTIALS_FILE);
runner.setProperty(PutSNS.ARN, "arn:aws:sns:us-west-2:100515378163:test-topic-1");
assertTrue(runner.setProperty("DynamicProperty", "hello!").isValid());

View File

@ -33,7 +33,7 @@ public class TestGetSQS {
@Test
public void testSimpleGet() {
final TestRunner runner = TestRunners.newTestRunner(new GetSQS());
runner.setProperty(PutSNS.CREDENTAILS_FILE, CREDENTIALS_FILE);
runner.setProperty(PutSNS.CREDENTIALS_FILE, CREDENTIALS_FILE);
runner.setProperty(GetSQS.TIMEOUT, "30 secs");
runner.setProperty(GetSQS.QUEUE_URL, "https://sqs.us-west-2.amazonaws.com/100515378163/test-queue-000000000");

View File

@ -36,7 +36,7 @@ public class TestPutSQS {
@Test
public void testSimplePut() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new PutSQS());
runner.setProperty(PutSNS.CREDENTAILS_FILE, CREDENTIALS_FILE);
runner.setProperty(PutSNS.CREDENTIALS_FILE, CREDENTIALS_FILE);
runner.setProperty(PutSQS.TIMEOUT, "30 secs");
runner.setProperty(PutSQS.QUEUE_URL, "https://sqs.us-west-2.amazonaws.com/100515378163/test-queue-000000000");
Assert.assertTrue(runner.setProperty("x-custom-prop", "hello").isValid());