mirror of
https://github.com/apache/nifi.git
synced 2025-02-09 11:35:05 +00:00
NIFI-4015 NIFI-3999 Fix DeleteSQS Issues
* Avoid exception by providing id in DeleteMessageBatchRequestEntry * Include receipt handle property descriptor for user configuration Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com> This closes #1888.
This commit is contained in:
parent
c3059939e8
commit
10254a03c2
@ -30,6 +30,7 @@ import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
@ -55,7 +56,8 @@ public class DeleteSQS extends AbstractSQSProcessor {
|
||||
.build();
|
||||
|
||||
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
|
||||
Arrays.asList(ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, REGION, QUEUE_URL, TIMEOUT, PROXY_HOST, PROXY_HOST_PORT));
|
||||
Arrays.asList(QUEUE_URL, RECEIPT_HANDLE, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE,
|
||||
REGION, TIMEOUT, PROXY_HOST, PROXY_HOST_PORT));
|
||||
|
||||
@Override
|
||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
@ -80,7 +82,10 @@ public class DeleteSQS extends AbstractSQSProcessor {
|
||||
|
||||
for (final FlowFile flowFile : flowFiles) {
|
||||
final DeleteMessageBatchRequestEntry entry = new DeleteMessageBatchRequestEntry();
|
||||
entry.setReceiptHandle(context.getProperty(RECEIPT_HANDLE).evaluateAttributeExpressions(flowFile).getValue());
|
||||
String receiptHandle = context.getProperty(RECEIPT_HANDLE).evaluateAttributeExpressions(flowFile).getValue();
|
||||
entry.setReceiptHandle(receiptHandle);
|
||||
String entryId = flowFile.getAttribute(CoreAttributes.UUID.key());
|
||||
entry.setId(entryId);
|
||||
entries.add(entry);
|
||||
}
|
||||
|
||||
|
@ -0,0 +1,83 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.aws.sqs;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import com.amazonaws.regions.Regions;
|
||||
import com.amazonaws.services.sqs.model.Message;
|
||||
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
|
||||
import com.amazonaws.services.sqs.model.SendMessageResult;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
|
||||
import com.amazonaws.auth.PropertiesCredentials;
|
||||
import com.amazonaws.services.sqs.AmazonSQSClient;
|
||||
|
||||
import org.junit.Before;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
|
||||
@Ignore("For local testing only - interacts with S3 so the credentials file must be configured and all necessary queues created")
|
||||
public class ITDeleteSQS {
|
||||
|
||||
private final String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties";
|
||||
private final String TEST_QUEUE_URL = "https://sqs.us-west-2.amazonaws.com/123456789012/nifi-test-queue";
|
||||
private final String TEST_REGION = "us-west-2";
|
||||
AmazonSQSClient sqsClient = null;
|
||||
|
||||
@Before
|
||||
public void setUp() throws IOException {
|
||||
PropertiesCredentials credentials = new PropertiesCredentials(new File(CREDENTIALS_FILE));
|
||||
sqsClient = new AmazonSQSClient(credentials);
|
||||
sqsClient.withRegion(Regions.fromName(TEST_REGION));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSimpleDelete() throws IOException {
|
||||
// Setup - put one message in queue
|
||||
SendMessageResult sendMessageResult = sqsClient.sendMessage(TEST_QUEUE_URL, "Test message");
|
||||
assertEquals(200, sendMessageResult.getSdkHttpMetadata().getHttpStatusCode());
|
||||
|
||||
// Setup - receive message to get receipt handle
|
||||
ReceiveMessageResult receiveMessageResult = sqsClient.receiveMessage(TEST_QUEUE_URL);
|
||||
assertEquals(200, receiveMessageResult.getSdkHttpMetadata().getHttpStatusCode());
|
||||
Message deleteMessage = receiveMessageResult.getMessages().get(0);
|
||||
String receiptHandle = deleteMessage.getReceiptHandle();
|
||||
|
||||
// Test - delete message with DeleteSQS
|
||||
final TestRunner runner = TestRunners.newTestRunner(new DeleteSQS());
|
||||
runner.setProperty(DeleteSQS.CREDENTIALS_FILE, CREDENTIALS_FILE);
|
||||
runner.setProperty(DeleteSQS.QUEUE_URL, TEST_QUEUE_URL);
|
||||
runner.setProperty(DeleteSQS.REGION, TEST_REGION);
|
||||
final Map<String, String> ffAttributes = new HashMap<>();
|
||||
ffAttributes.put("filename", "1.txt");
|
||||
ffAttributes.put("sqs.receipt.handle", receiptHandle);
|
||||
runner.enqueue("TestMessageBody", ffAttributes);
|
||||
|
||||
runner.run(1);
|
||||
|
||||
runner.assertAllFlowFilesTransferred(DeleteSQS.REL_SUCCESS, 1);
|
||||
}
|
||||
|
||||
}
|
@ -16,12 +16,9 @@
|
||||
*/
|
||||
package org.apache.nifi.processors.aws.sqs;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
|
||||
@ -48,11 +45,6 @@ public class TestDeleteSQS {
|
||||
public void setUp() {
|
||||
mockSQSClient = Mockito.mock(AmazonSQSClient.class);
|
||||
mockDeleteSQS = new DeleteSQS() {
|
||||
|
||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
return Arrays.asList(RECEIPT_HANDLE, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, REGION, QUEUE_URL, TIMEOUT, PROXY_HOST, PROXY_HOST_PORT);
|
||||
}
|
||||
|
||||
protected AmazonSQSClient getClient() {
|
||||
actualSQSClient = client;
|
||||
return mockSQSClient;
|
||||
@ -81,6 +73,26 @@ public class TestDeleteSQS {
|
||||
runner.assertAllFlowFilesTransferred(DeleteSQS.REL_SUCCESS, 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDeleteWithCustomReceiptHandle() {
|
||||
runner.setProperty(DeleteSQS.QUEUE_URL, "https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000");
|
||||
runner.setProperty(DeleteSQS.RECEIPT_HANDLE, "${custom.receipt.handle}");
|
||||
final Map<String, String> ffAttributes = new HashMap<>();
|
||||
ffAttributes.put("filename", "1.txt");
|
||||
ffAttributes.put("custom.receipt.handle", "test-receipt-handle-1");
|
||||
runner.enqueue("TestMessageBody", ffAttributes);
|
||||
|
||||
runner.assertValid();
|
||||
runner.run(1);
|
||||
|
||||
ArgumentCaptor<DeleteMessageBatchRequest> captureDeleteRequest = ArgumentCaptor.forClass(DeleteMessageBatchRequest.class);
|
||||
Mockito.verify(mockSQSClient, Mockito.times(1)).deleteMessageBatch(captureDeleteRequest.capture());
|
||||
DeleteMessageBatchRequest deleteRequest = captureDeleteRequest.getValue();
|
||||
assertEquals("test-receipt-handle-1", deleteRequest.getEntries().get(0).getReceiptHandle());
|
||||
|
||||
runner.assertAllFlowFilesTransferred(DeleteSQS.REL_SUCCESS, 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDeleteException() {
|
||||
runner.setProperty(DeleteSQS.QUEUE_URL, "https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000");
|
||||
|
Loading…
x
Reference in New Issue
Block a user