diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/PutSQS.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/PutSQS.java index 48b0f6984b..bfc75871a9 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/PutSQS.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/PutSQS.java @@ -61,15 +61,35 @@ public class PutSQS extends AbstractSQSProcessor { public static final PropertyDescriptor DELAY = new PropertyDescriptor.Builder() .name("Delay") + .displayName("Delay") .description("The amount of time to delay the message before it becomes available to consumers") .required(true) .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) .defaultValue("0 secs") .build(); + public static final PropertyDescriptor MESSAGEGROUPID = new PropertyDescriptor.Builder() + .name("message-group-id") + .displayName("Message Group ID") + .description("If using FIFO, the message group to which the FlowFile belongs") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + public static final PropertyDescriptor MESSAGEDEDUPLICATIONID = new PropertyDescriptor.Builder() + .name("deduplication-message-id") + .displayName("Deduplication Message ID") + .description("The token used for deduplication of sent messages") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + public static final List properties = Collections.unmodifiableList( Arrays.asList(QUEUE_URL, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, - REGION, DELAY, TIMEOUT, ENDPOINT_OVERRIDE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD)); + REGION, DELAY, TIMEOUT, ENDPOINT_OVERRIDE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, + PROXY_PASSWORD, MESSAGEGROUPID, MESSAGEDEDUPLICATIONID)); private volatile List userDefinedProperties = Collections.emptyList(); @@ -115,11 +135,23 @@ public class PutSQS extends AbstractSQSProcessor { final Set entries = new HashSet<>(); final SendMessageBatchRequestEntry entry = new SendMessageBatchRequestEntry(); - entry.setId(flowFile.getAttribute("uuid")); final ByteArrayOutputStream baos = new ByteArrayOutputStream(); session.exportTo(flowFile, baos); final String flowFileContent = baos.toString(); entry.setMessageBody(flowFileContent); + entry.setId(flowFile.getAttribute("uuid")); + + if (context.getProperty(MESSAGEGROUPID).isSet()) { + entry.setMessageGroupId(context.getProperty(MESSAGEGROUPID) + .evaluateAttributeExpressions(flowFile) + .getValue()); + } + + if (context.getProperty(MESSAGEDEDUPLICATIONID).isSet()) { + entry.setMessageDeduplicationId(context.getProperty(MESSAGEDEDUPLICATIONID) + .evaluateAttributeExpressions(flowFile) + .getValue()); + } final Map messageAttributes = new HashMap<>(); diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestPutSQS.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestPutSQS.java index 39dae75904..4226d5267d 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestPutSQS.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestPutSQS.java @@ -101,4 +101,33 @@ public class TestPutSQS { runner.assertAllFlowFilesTransferred(PutSQS.REL_FAILURE, 1); } + @Test + public void testFIFOPut() throws IOException { + runner.setProperty(PutSQS.QUEUE_URL, "https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000"); + runner.setProperty(PutSQS.MESSAGEDEDUPLICATIONID, "${myuuid}"); + runner.setProperty(PutSQS.MESSAGEGROUPID, "test1234"); + Assert.assertTrue(runner.setProperty("x-custom-prop", "hello").isValid()); + + final Map attrs = new HashMap<>(); + attrs.put("filename", "1.txt"); + attrs.put("myuuid", "fb0dfed8-092e-40ee-83ce-5b576cd26236"); + runner.enqueue("TestMessageBody", attrs); + + SendMessageBatchResult batchResult = new SendMessageBatchResult(); + Mockito.when(mockSQSClient.sendMessageBatch(Mockito.any(SendMessageBatchRequest.class))).thenReturn(batchResult); + + runner.run(1); + + ArgumentCaptor captureRequest = ArgumentCaptor.forClass(SendMessageBatchRequest.class); + Mockito.verify(mockSQSClient, Mockito.times(1)).sendMessageBatch(captureRequest.capture()); + SendMessageBatchRequest request = captureRequest.getValue(); + assertEquals("https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000", request.getQueueUrl()); + assertEquals("hello", request.getEntries().get(0).getMessageAttributes().get("x-custom-prop").getStringValue()); + assertEquals("TestMessageBody", request.getEntries().get(0).getMessageBody()); + assertEquals("test1234", request.getEntries().get(0).getMessageGroupId()); + assertEquals("fb0dfed8-092e-40ee-83ce-5b576cd26236", request.getEntries().get(0).getMessageDeduplicationId()); + + runner.assertAllFlowFilesTransferred(PutSQS.REL_SUCCESS, 1); + } + }