NIFI-6724 - Check for SQS API call result in case of failures

This closes #3897.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
Pierre Villard 2019-11-20 18:07:06 +01:00 committed by Peter Turcsanyi
parent 5bfc86737d
commit 6507b78948
4 changed files with 40 additions and 28 deletions

View File

@ -33,11 +33,13 @@ import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import com.amazonaws.services.sqs.AmazonSQSClient; import com.amazonaws.services.sqs.AmazonSQSClient;
import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest; import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest;
import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry; import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry;
import com.amazonaws.services.sqs.model.DeleteMessageBatchResult;
@SupportsBatching @SupportsBatching
@SeeAlso({GetSQS.class, PutSQS.class}) @SeeAlso({GetSQS.class, PutSQS.class})
@ -66,42 +68,41 @@ public class DeleteSQS extends AbstractSQSProcessor {
@Override @Override
public void onTrigger(final ProcessContext context, final ProcessSession session) { public void onTrigger(final ProcessContext context, final ProcessSession session) {
List<FlowFile> flowFiles = session.get(1); FlowFile flowFile = session.get();
if (flowFiles.isEmpty()) { if (flowFile == null) {
return; return;
} }
final FlowFile firstFlowFile = flowFiles.get(0); final String queueUrl = context.getProperty(QUEUE_URL).evaluateAttributeExpressions(flowFile).getValue();
final String queueUrl = context.getProperty(QUEUE_URL).evaluateAttributeExpressions(firstFlowFile).getValue();
final AmazonSQSClient client = getClient(); final AmazonSQSClient client = getClient();
final DeleteMessageBatchRequest request = new DeleteMessageBatchRequest(); final DeleteMessageBatchRequest request = new DeleteMessageBatchRequest();
request.setQueueUrl(queueUrl); request.setQueueUrl(queueUrl);
final List<DeleteMessageBatchRequestEntry> entries = new ArrayList<>(flowFiles.size()); final List<DeleteMessageBatchRequestEntry> entries = new ArrayList<>();
for (final FlowFile flowFile : flowFiles) {
final DeleteMessageBatchRequestEntry entry = new DeleteMessageBatchRequestEntry(); final DeleteMessageBatchRequestEntry entry = new DeleteMessageBatchRequestEntry();
String receiptHandle = context.getProperty(RECEIPT_HANDLE).evaluateAttributeExpressions(flowFile).getValue(); String receiptHandle = context.getProperty(RECEIPT_HANDLE).evaluateAttributeExpressions(flowFile).getValue();
entry.setReceiptHandle(receiptHandle); entry.setReceiptHandle(receiptHandle);
String entryId = flowFile.getAttribute(CoreAttributes.UUID.key()); String entryId = flowFile.getAttribute(CoreAttributes.UUID.key());
entry.setId(entryId); entry.setId(entryId);
entries.add(entry); entries.add(entry);
}
request.setEntries(entries); request.setEntries(entries);
try { try {
client.deleteMessageBatch(request); DeleteMessageBatchResult response = client.deleteMessageBatch(request);
getLogger().info("Successfully deleted {} objects from SQS", new Object[]{flowFiles.size()});
session.transfer(flowFiles, REL_SUCCESS); // check for errors
} catch (final Exception e) { if (!response.getFailed().isEmpty()) {
getLogger().error("Failed to delete {} objects from SQS due to {}", new Object[]{flowFiles.size(), e}); throw new ProcessException(response.getFailed().get(0).toString());
final List<FlowFile> penalizedFlowFiles = new ArrayList<>();
for (final FlowFile flowFile : flowFiles) {
penalizedFlowFiles.add(session.penalize(flowFile));
} }
session.transfer(penalizedFlowFiles, REL_FAILURE);
getLogger().info("Successfully deleted message from SQS for {}", new Object[] { flowFile });
session.transfer(flowFile, REL_SUCCESS);
} catch (final Exception e) {
getLogger().error("Failed to delete message from SQS due to {}", new Object[] { e });
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
return;
} }
} }

View File

@ -40,12 +40,14 @@ import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import com.amazonaws.services.sqs.AmazonSQSClient; import com.amazonaws.services.sqs.AmazonSQSClient;
import com.amazonaws.services.sqs.model.MessageAttributeValue; import com.amazonaws.services.sqs.model.MessageAttributeValue;
import com.amazonaws.services.sqs.model.SendMessageBatchRequest; import com.amazonaws.services.sqs.model.SendMessageBatchRequest;
import com.amazonaws.services.sqs.model.SendMessageBatchRequestEntry; import com.amazonaws.services.sqs.model.SendMessageBatchRequestEntry;
import com.amazonaws.services.sqs.model.SendMessageBatchResult;
@SupportsBatching @SupportsBatching
@SeeAlso({ GetSQS.class, DeleteSQS.class }) @SeeAlso({ GetSQS.class, DeleteSQS.class })
@ -135,7 +137,12 @@ public class PutSQS extends AbstractSQSProcessor {
request.setEntries(entries); request.setEntries(entries);
try { try {
client.sendMessageBatch(request); SendMessageBatchResult response = client.sendMessageBatch(request);
// check for errors
if (!response.getFailed().isEmpty()) {
throw new ProcessException(response.getFailed().get(0).toString());
}
} catch (final Exception e) { } catch (final Exception e) {
getLogger().error("Failed to send messages to Amazon SQS due to {}; routing to failure", new Object[]{e}); getLogger().error("Failed to send messages to Amazon SQS due to {}; routing to failure", new Object[]{e});
flowFile = session.penalize(flowFile); flowFile = session.penalize(flowFile);

View File

@ -16,6 +16,7 @@
*/ */
package org.apache.nifi.processors.aws.sqs; package org.apache.nifi.processors.aws.sqs;
import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
@ -25,6 +26,7 @@ import org.apache.nifi.util.TestRunners;
import com.amazonaws.services.sqs.AmazonSQSClient; import com.amazonaws.services.sqs.AmazonSQSClient;
import com.amazonaws.services.sqs.model.AmazonSQSException; import com.amazonaws.services.sqs.model.AmazonSQSException;
import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest; import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest;
import com.amazonaws.services.sqs.model.DeleteMessageBatchResult;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -38,15 +40,17 @@ public class TestDeleteSQS {
private TestRunner runner = null; private TestRunner runner = null;
private DeleteSQS mockDeleteSQS = null; private DeleteSQS mockDeleteSQS = null;
private AmazonSQSClient actualSQSClient = null;
private AmazonSQSClient mockSQSClient = null; private AmazonSQSClient mockSQSClient = null;
@Before @Before
public void setUp() { public void setUp() {
mockSQSClient = Mockito.mock(AmazonSQSClient.class); mockSQSClient = Mockito.mock(AmazonSQSClient.class);
DeleteMessageBatchResult mockResponse = Mockito.mock(DeleteMessageBatchResult.class);
Mockito.when(mockSQSClient.deleteMessageBatch(Mockito.any())).thenReturn(mockResponse);
Mockito.when(mockResponse.getFailed()).thenReturn(new ArrayList<>());
mockDeleteSQS = new DeleteSQS() { mockDeleteSQS = new DeleteSQS() {
@Override
protected AmazonSQSClient getClient() { protected AmazonSQSClient getClient() {
actualSQSClient = client;
return mockSQSClient; return mockSQSClient;
} }
}; };

View File

@ -26,7 +26,7 @@
<packaging>pom</packaging> <packaging>pom</packaging>
<properties> <properties>
<aws-java-sdk-version>1.11.599</aws-java-sdk-version> <aws-java-sdk-version>1.11.677</aws-java-sdk-version>
</properties> </properties>
<dependencyManagement> <dependencyManagement>