From e2ca50e66a3b1a7d810ea8eac256d21bca3fd07f Mon Sep 17 00:00:00 2001 From: Evan Reynolds Date: Mon, 1 Jul 2019 17:26:05 -0700 Subject: [PATCH] NIFI-6367 - This closes #3563. more error handling for FetchS3Object Signed-off-by: Joe Witt --- .../nifi/processors/aws/s3/FetchS3Object.java | 13 ++++++++ .../processors/aws/s3/TestFetchS3Object.java | 30 +++++++++++++++++++ 2 files changed, 43 insertions(+) diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java index 8dabeb9706..aa6233d1cf 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.SupportsBatching; @@ -39,6 +40,7 @@ import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.FlowFileAccessException; import org.apache.nifi.processor.util.StandardValidators; import com.amazonaws.AmazonClientException; @@ -129,6 +131,9 @@ public class FetchS3Object extends AbstractS3Processor { } try (final S3Object s3Object = client.getObject(request)) { + if (s3Object == null) { + throw new IOException("AWS refused to execute this request."); + } flowFile = session.importFrom(s3Object.getObjectContent(), flowFile); attributes.put("s3.bucket", s3Object.getBucketName()); @@ -174,6 +179,14 @@ public class FetchS3Object extends AbstractS3Processor { flowFile = session.penalize(flowFile); session.transfer(flowFile, REL_FAILURE); return; + } catch (final FlowFileAccessException ffae) { + if (ExceptionUtils.indexOfType(ffae, AmazonClientException.class) != -1) { + getLogger().error("Failed to retrieve S3 Object for {}; routing to failure", new Object[]{flowFile, ffae}); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + return; + } + throw ffae; } if (!attributes.isEmpty()) { diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java index 6416d530e1..ed3fdfe3c1 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java @@ -24,6 +24,7 @@ import java.util.Map; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.exception.FlowFileAccessException; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -234,6 +235,35 @@ public class TestFetchS3Object { runner.assertAllFlowFilesTransferred(FetchS3Object.REL_FAILURE, 1); } + @Test + public void testGetObjectReturnsNull() throws IOException { + runner.setProperty(FetchS3Object.REGION, "us-east-1"); + runner.setProperty(FetchS3Object.BUCKET, "request-bucket"); + final Map attrs = new HashMap<>(); + attrs.put("filename", "request-key"); + runner.enqueue(new byte[0], attrs); + Mockito.when(mockS3Client.getObject(Mockito.any())).thenReturn(null); + + runner.run(1); + + runner.assertAllFlowFilesTransferred(FetchS3Object.REL_FAILURE, 1); + } + + @Test + public void testFlowFileAccessExceptionGoesToFailure() throws IOException { + runner.setProperty(FetchS3Object.REGION, "us-east-1"); + runner.setProperty(FetchS3Object.BUCKET, "request-bucket"); + final Map attrs = new HashMap<>(); + attrs.put("filename", "request-key"); + runner.enqueue(new byte[0], attrs); + + AmazonS3Exception amazonException = new AmazonS3Exception("testing"); + Mockito.doThrow(new FlowFileAccessException("testing nested", amazonException)).when(mockS3Client).getObject(Mockito.any()); + + runner.run(1); + + runner.assertAllFlowFilesTransferred(FetchS3Object.REL_FAILURE, 1); + } @Test public void testGetPropertyDescriptors() throws Exception { FetchS3Object processor = new FetchS3Object();