diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java index 5beead5a35..e2cef50503 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java @@ -30,6 +30,7 @@ import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.util.StandardValidators; @@ -55,7 +56,8 @@ public class DeleteSQS extends AbstractSQSProcessor { .build(); public static final List properties = Collections.unmodifiableList( - Arrays.asList(ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, REGION, QUEUE_URL, TIMEOUT, PROXY_HOST, PROXY_HOST_PORT)); + Arrays.asList(QUEUE_URL, RECEIPT_HANDLE, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, + REGION, TIMEOUT, PROXY_HOST, PROXY_HOST_PORT)); @Override protected List getSupportedPropertyDescriptors() { @@ -80,7 +82,10 @@ public class DeleteSQS extends AbstractSQSProcessor { for (final FlowFile flowFile : flowFiles) { final DeleteMessageBatchRequestEntry entry = new DeleteMessageBatchRequestEntry(); - entry.setReceiptHandle(context.getProperty(RECEIPT_HANDLE).evaluateAttributeExpressions(flowFile).getValue()); + String receiptHandle = context.getProperty(RECEIPT_HANDLE).evaluateAttributeExpressions(flowFile).getValue(); + entry.setReceiptHandle(receiptHandle); + String entryId = flowFile.getAttribute(CoreAttributes.UUID.key()); + entry.setId(entryId); entries.add(entry); } diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/ITDeleteSQS.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/ITDeleteSQS.java new file mode 100644 index 0000000000..04fa676661 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/ITDeleteSQS.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.sqs; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import com.amazonaws.regions.Regions; +import com.amazonaws.services.sqs.model.Message; +import com.amazonaws.services.sqs.model.ReceiveMessageResult; +import com.amazonaws.services.sqs.model.SendMessageResult; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; + +import com.amazonaws.auth.PropertiesCredentials; +import com.amazonaws.services.sqs.AmazonSQSClient; + +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + + +@Ignore("For local testing only - interacts with S3 so the credentials file must be configured and all necessary queues created") +public class ITDeleteSQS { + + private final String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties"; + private final String TEST_QUEUE_URL = "https://sqs.us-west-2.amazonaws.com/123456789012/nifi-test-queue"; + private final String TEST_REGION = "us-west-2"; + AmazonSQSClient sqsClient = null; + + @Before + public void setUp() throws IOException { + PropertiesCredentials credentials = new PropertiesCredentials(new File(CREDENTIALS_FILE)); + sqsClient = new AmazonSQSClient(credentials); + sqsClient.withRegion(Regions.fromName(TEST_REGION)); + } + + @Test + public void testSimpleDelete() throws IOException { + // Setup - put one message in queue + SendMessageResult sendMessageResult = sqsClient.sendMessage(TEST_QUEUE_URL, "Test message"); + assertEquals(200, sendMessageResult.getSdkHttpMetadata().getHttpStatusCode()); + + // Setup - receive message to get receipt handle + ReceiveMessageResult receiveMessageResult = sqsClient.receiveMessage(TEST_QUEUE_URL); + assertEquals(200, receiveMessageResult.getSdkHttpMetadata().getHttpStatusCode()); + Message deleteMessage = receiveMessageResult.getMessages().get(0); + String receiptHandle = deleteMessage.getReceiptHandle(); + + // Test - delete message with DeleteSQS + final TestRunner runner = TestRunners.newTestRunner(new DeleteSQS()); + runner.setProperty(DeleteSQS.CREDENTIALS_FILE, CREDENTIALS_FILE); + runner.setProperty(DeleteSQS.QUEUE_URL, TEST_QUEUE_URL); + runner.setProperty(DeleteSQS.REGION, TEST_REGION); + final Map ffAttributes = new HashMap<>(); + ffAttributes.put("filename", "1.txt"); + ffAttributes.put("sqs.receipt.handle", receiptHandle); + runner.enqueue("TestMessageBody", ffAttributes); + + runner.run(1); + + runner.assertAllFlowFilesTransferred(DeleteSQS.REL_SUCCESS, 1); + } + +} diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestDeleteSQS.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestDeleteSQS.java index 11cc7cdbc7..feb00750f5 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestDeleteSQS.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestDeleteSQS.java @@ -16,12 +16,9 @@ */ package org.apache.nifi.processors.aws.sqs; -import java.util.Arrays; import java.util.HashMap; -import java.util.List; import java.util.Map; -import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -48,11 +45,6 @@ public class TestDeleteSQS { public void setUp() { mockSQSClient = Mockito.mock(AmazonSQSClient.class); mockDeleteSQS = new DeleteSQS() { - - protected List getSupportedPropertyDescriptors() { - return Arrays.asList(RECEIPT_HANDLE, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, REGION, QUEUE_URL, TIMEOUT, PROXY_HOST, PROXY_HOST_PORT); - } - protected AmazonSQSClient getClient() { actualSQSClient = client; return mockSQSClient; @@ -81,6 +73,26 @@ public class TestDeleteSQS { runner.assertAllFlowFilesTransferred(DeleteSQS.REL_SUCCESS, 1); } + @Test + public void testDeleteWithCustomReceiptHandle() { + runner.setProperty(DeleteSQS.QUEUE_URL, "https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000"); + runner.setProperty(DeleteSQS.RECEIPT_HANDLE, "${custom.receipt.handle}"); + final Map ffAttributes = new HashMap<>(); + ffAttributes.put("filename", "1.txt"); + ffAttributes.put("custom.receipt.handle", "test-receipt-handle-1"); + runner.enqueue("TestMessageBody", ffAttributes); + + runner.assertValid(); + runner.run(1); + + ArgumentCaptor captureDeleteRequest = ArgumentCaptor.forClass(DeleteMessageBatchRequest.class); + Mockito.verify(mockSQSClient, Mockito.times(1)).deleteMessageBatch(captureDeleteRequest.capture()); + DeleteMessageBatchRequest deleteRequest = captureDeleteRequest.getValue(); + assertEquals("test-receipt-handle-1", deleteRequest.getEntries().get(0).getReceiptHandle()); + + runner.assertAllFlowFilesTransferred(DeleteSQS.REL_SUCCESS, 1); + } + @Test public void testDeleteException() { runner.setProperty(DeleteSQS.QUEUE_URL, "https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000");