NIFI-4540 - Added FIFO options to PutSQS

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #4705.
This commit is contained in:
r65535 2020-12-03 08:32:53 +00:00 committed by Pierre Villard
parent 37c2284d04
commit 2f99d6fce4
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
2 changed files with 63 additions and 2 deletions

View File

@ -61,15 +61,35 @@ public class PutSQS extends AbstractSQSProcessor {
public static final PropertyDescriptor DELAY = new PropertyDescriptor.Builder() public static final PropertyDescriptor DELAY = new PropertyDescriptor.Builder()
.name("Delay") .name("Delay")
.displayName("Delay")
.description("The amount of time to delay the message before it becomes available to consumers") .description("The amount of time to delay the message before it becomes available to consumers")
.required(true) .required(true)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.defaultValue("0 secs") .defaultValue("0 secs")
.build(); .build();
public static final PropertyDescriptor MESSAGEGROUPID = new PropertyDescriptor.Builder()
.name("message-group-id")
.displayName("Message Group ID")
.description("If using FIFO, the message group to which the FlowFile belongs")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor MESSAGEDEDUPLICATIONID = new PropertyDescriptor.Builder()
.name("deduplication-message-id")
.displayName("Deduplication Message ID")
.description("The token used for deduplication of sent messages")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList( public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
Arrays.asList(QUEUE_URL, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, Arrays.asList(QUEUE_URL, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE,
REGION, DELAY, TIMEOUT, ENDPOINT_OVERRIDE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD)); REGION, DELAY, TIMEOUT, ENDPOINT_OVERRIDE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME,
PROXY_PASSWORD, MESSAGEGROUPID, MESSAGEDEDUPLICATIONID));
private volatile List<PropertyDescriptor> userDefinedProperties = Collections.emptyList(); private volatile List<PropertyDescriptor> userDefinedProperties = Collections.emptyList();
@ -115,11 +135,23 @@ public class PutSQS extends AbstractSQSProcessor {
final Set<SendMessageBatchRequestEntry> entries = new HashSet<>(); final Set<SendMessageBatchRequestEntry> entries = new HashSet<>();
final SendMessageBatchRequestEntry entry = new SendMessageBatchRequestEntry(); final SendMessageBatchRequestEntry entry = new SendMessageBatchRequestEntry();
entry.setId(flowFile.getAttribute("uuid"));
final ByteArrayOutputStream baos = new ByteArrayOutputStream(); final ByteArrayOutputStream baos = new ByteArrayOutputStream();
session.exportTo(flowFile, baos); session.exportTo(flowFile, baos);
final String flowFileContent = baos.toString(); final String flowFileContent = baos.toString();
entry.setMessageBody(flowFileContent); entry.setMessageBody(flowFileContent);
entry.setId(flowFile.getAttribute("uuid"));
if (context.getProperty(MESSAGEGROUPID).isSet()) {
entry.setMessageGroupId(context.getProperty(MESSAGEGROUPID)
.evaluateAttributeExpressions(flowFile)
.getValue());
}
if (context.getProperty(MESSAGEDEDUPLICATIONID).isSet()) {
entry.setMessageDeduplicationId(context.getProperty(MESSAGEDEDUPLICATIONID)
.evaluateAttributeExpressions(flowFile)
.getValue());
}
final Map<String, MessageAttributeValue> messageAttributes = new HashMap<>(); final Map<String, MessageAttributeValue> messageAttributes = new HashMap<>();

View File

@ -101,4 +101,33 @@ public class TestPutSQS {
runner.assertAllFlowFilesTransferred(PutSQS.REL_FAILURE, 1); runner.assertAllFlowFilesTransferred(PutSQS.REL_FAILURE, 1);
} }
@Test
public void testFIFOPut() throws IOException {
runner.setProperty(PutSQS.QUEUE_URL, "https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000");
runner.setProperty(PutSQS.MESSAGEDEDUPLICATIONID, "${myuuid}");
runner.setProperty(PutSQS.MESSAGEGROUPID, "test1234");
Assert.assertTrue(runner.setProperty("x-custom-prop", "hello").isValid());
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "1.txt");
attrs.put("myuuid", "fb0dfed8-092e-40ee-83ce-5b576cd26236");
runner.enqueue("TestMessageBody", attrs);
SendMessageBatchResult batchResult = new SendMessageBatchResult();
Mockito.when(mockSQSClient.sendMessageBatch(Mockito.any(SendMessageBatchRequest.class))).thenReturn(batchResult);
runner.run(1);
ArgumentCaptor<SendMessageBatchRequest> captureRequest = ArgumentCaptor.forClass(SendMessageBatchRequest.class);
Mockito.verify(mockSQSClient, Mockito.times(1)).sendMessageBatch(captureRequest.capture());
SendMessageBatchRequest request = captureRequest.getValue();
assertEquals("https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000", request.getQueueUrl());
assertEquals("hello", request.getEntries().get(0).getMessageAttributes().get("x-custom-prop").getStringValue());
assertEquals("TestMessageBody", request.getEntries().get(0).getMessageBody());
assertEquals("test1234", request.getEntries().get(0).getMessageGroupId());
assertEquals("fb0dfed8-092e-40ee-83ce-5b576cd26236", request.getEntries().get(0).getMessageDeduplicationId());
runner.assertAllFlowFilesTransferred(PutSQS.REL_SUCCESS, 1);
}
} }