mirror of https://github.com/apache/nifi.git
NIFI-6367 - This closes #3563. more error handling for FetchS3Object
Signed-off-by: Joe Witt <joewitt@apache.org>
This commit is contained in:
parent
b78aeb67a1
commit
e2ca50e66a
|
@ -24,6 +24,7 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.apache.commons.lang3.exception.ExceptionUtils;
|
||||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||||
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
|
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
|
||||||
import org.apache.nifi.annotation.behavior.SupportsBatching;
|
import org.apache.nifi.annotation.behavior.SupportsBatching;
|
||||||
|
@ -39,6 +40,7 @@ import org.apache.nifi.flowfile.FlowFile;
|
||||||
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||||
import org.apache.nifi.processor.ProcessContext;
|
import org.apache.nifi.processor.ProcessContext;
|
||||||
import org.apache.nifi.processor.ProcessSession;
|
import org.apache.nifi.processor.ProcessSession;
|
||||||
|
import org.apache.nifi.processor.exception.FlowFileAccessException;
|
||||||
import org.apache.nifi.processor.util.StandardValidators;
|
import org.apache.nifi.processor.util.StandardValidators;
|
||||||
|
|
||||||
import com.amazonaws.AmazonClientException;
|
import com.amazonaws.AmazonClientException;
|
||||||
|
@ -129,6 +131,9 @@ public class FetchS3Object extends AbstractS3Processor {
|
||||||
}
|
}
|
||||||
|
|
||||||
try (final S3Object s3Object = client.getObject(request)) {
|
try (final S3Object s3Object = client.getObject(request)) {
|
||||||
|
if (s3Object == null) {
|
||||||
|
throw new IOException("AWS refused to execute this request.");
|
||||||
|
}
|
||||||
flowFile = session.importFrom(s3Object.getObjectContent(), flowFile);
|
flowFile = session.importFrom(s3Object.getObjectContent(), flowFile);
|
||||||
attributes.put("s3.bucket", s3Object.getBucketName());
|
attributes.put("s3.bucket", s3Object.getBucketName());
|
||||||
|
|
||||||
|
@ -174,6 +179,14 @@ public class FetchS3Object extends AbstractS3Processor {
|
||||||
flowFile = session.penalize(flowFile);
|
flowFile = session.penalize(flowFile);
|
||||||
session.transfer(flowFile, REL_FAILURE);
|
session.transfer(flowFile, REL_FAILURE);
|
||||||
return;
|
return;
|
||||||
|
} catch (final FlowFileAccessException ffae) {
|
||||||
|
if (ExceptionUtils.indexOfType(ffae, AmazonClientException.class) != -1) {
|
||||||
|
getLogger().error("Failed to retrieve S3 Object for {}; routing to failure", new Object[]{flowFile, ffae});
|
||||||
|
flowFile = session.penalize(flowFile);
|
||||||
|
session.transfer(flowFile, REL_FAILURE);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
throw ffae;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!attributes.isEmpty()) {
|
if (!attributes.isEmpty()) {
|
||||||
|
|
|
@ -24,6 +24,7 @@ import java.util.Map;
|
||||||
|
|
||||||
import org.apache.nifi.components.PropertyDescriptor;
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||||
|
import org.apache.nifi.processor.exception.FlowFileAccessException;
|
||||||
import org.apache.nifi.util.MockFlowFile;
|
import org.apache.nifi.util.MockFlowFile;
|
||||||
import org.apache.nifi.util.TestRunner;
|
import org.apache.nifi.util.TestRunner;
|
||||||
import org.apache.nifi.util.TestRunners;
|
import org.apache.nifi.util.TestRunners;
|
||||||
|
@ -234,6 +235,35 @@ public class TestFetchS3Object {
|
||||||
runner.assertAllFlowFilesTransferred(FetchS3Object.REL_FAILURE, 1);
|
runner.assertAllFlowFilesTransferred(FetchS3Object.REL_FAILURE, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetObjectReturnsNull() throws IOException {
|
||||||
|
runner.setProperty(FetchS3Object.REGION, "us-east-1");
|
||||||
|
runner.setProperty(FetchS3Object.BUCKET, "request-bucket");
|
||||||
|
final Map<String, String> attrs = new HashMap<>();
|
||||||
|
attrs.put("filename", "request-key");
|
||||||
|
runner.enqueue(new byte[0], attrs);
|
||||||
|
Mockito.when(mockS3Client.getObject(Mockito.any())).thenReturn(null);
|
||||||
|
|
||||||
|
runner.run(1);
|
||||||
|
|
||||||
|
runner.assertAllFlowFilesTransferred(FetchS3Object.REL_FAILURE, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFlowFileAccessExceptionGoesToFailure() throws IOException {
|
||||||
|
runner.setProperty(FetchS3Object.REGION, "us-east-1");
|
||||||
|
runner.setProperty(FetchS3Object.BUCKET, "request-bucket");
|
||||||
|
final Map<String, String> attrs = new HashMap<>();
|
||||||
|
attrs.put("filename", "request-key");
|
||||||
|
runner.enqueue(new byte[0], attrs);
|
||||||
|
|
||||||
|
AmazonS3Exception amazonException = new AmazonS3Exception("testing");
|
||||||
|
Mockito.doThrow(new FlowFileAccessException("testing nested", amazonException)).when(mockS3Client).getObject(Mockito.any());
|
||||||
|
|
||||||
|
runner.run(1);
|
||||||
|
|
||||||
|
runner.assertAllFlowFilesTransferred(FetchS3Object.REL_FAILURE, 1);
|
||||||
|
}
|
||||||
@Test
|
@Test
|
||||||
public void testGetPropertyDescriptors() throws Exception {
|
public void testGetPropertyDescriptors() throws Exception {
|
||||||
FetchS3Object processor = new FetchS3Object();
|
FetchS3Object processor = new FetchS3Object();
|
||||||
|
|
Loading…
Reference in New Issue