diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java index 2acc38b2a4..f4c3c0424c 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java @@ -75,9 +75,28 @@ public class PutSNS extends AbstractSNSProcessor { .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); + public static final PropertyDescriptor MESSAGEGROUPID = new PropertyDescriptor.Builder() + .name("Message Group ID") + .displayName("Message Group ID") + .description("If using FIFO, the message group to which the flowFile belongs") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + public static final PropertyDescriptor MESSAGEDEDUPLICATIONID = new PropertyDescriptor.Builder() + .name("Deduplication Message ID") + .displayName("Deduplication Message ID") + .description("The token used for deduplication of sent messages") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + public static final List properties = Collections.unmodifiableList( Arrays.asList(ARN, ARN_TYPE, SUBJECT, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, - USE_JSON_STRUCTURE, CHARACTER_ENCODING, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD)); + USE_JSON_STRUCTURE, CHARACTER_ENCODING, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD, + MESSAGEGROUPID, MESSAGEDEDUPLICATIONID)); public static final int MAX_SIZE = 256 * 1024; @@ -120,6 +139,18 @@ public class PutSNS extends AbstractSNSProcessor { final PublishRequest request = new PublishRequest(); request.setMessage(message); + if (context.getProperty(MESSAGEGROUPID).isSet()) { + request.setMessageGroupId(context.getProperty(MESSAGEGROUPID) + .evaluateAttributeExpressions(flowFile) + .getValue()); + } + + if (context.getProperty(MESSAGEDEDUPLICATIONID).isSet()) { + request.setMessageDeduplicationId(context.getProperty(MESSAGEDEDUPLICATIONID) + .evaluateAttributeExpressions(flowFile) + .getValue()); + } + if (context.getProperty(USE_JSON_STRUCTURE).asBoolean()) { request.setMessageStructure("json"); } diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sns/TestPutSNS.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sns/TestPutSNS.java index 33eba238e8..25cbde4bef 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sns/TestPutSNS.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sns/TestPutSNS.java @@ -89,6 +89,41 @@ public class TestPutSNS { ff0.assertAttributeEquals(CoreAttributes.FILENAME.key(), "1.txt"); } + @Test + public void testPublishFIFO() throws IOException { + runner.setProperty(PutSNS.CREDENTIALS_FILE, "src/test/resources/mock-aws-credentials.properties"); + runner.setProperty(PutSNS.ARN, "arn:aws:sns:us-west-2:123456789012:test-topic-1.fifo"); + runner.setProperty(PutSNS.SUBJECT, "${eval.subject}"); + runner.setProperty(PutSNS.MESSAGEDEDUPLICATIONID, "${myuuid}"); + runner.setProperty(PutSNS.MESSAGEGROUPID, "test1234"); + assertTrue(runner.setProperty("DynamicProperty", "hello!").isValid()); + final Map ffAttributes = new HashMap<>(); + ffAttributes.put("filename", "1.txt"); + ffAttributes.put("eval.subject", "test-subject"); + ffAttributes.put("myuuid", "fb0dfed8-092e-40ee-83ce-5b576cd26236"); + runner.enqueue("Test Message Content", ffAttributes); + + PublishResult mockPublishResult = new PublishResult(); + Mockito.when(mockSNSClient.publish(Mockito.any(PublishRequest.class))).thenReturn(mockPublishResult); + + runner.run(); + + ArgumentCaptor captureRequest = ArgumentCaptor.forClass(PublishRequest.class); + Mockito.verify(mockSNSClient, Mockito.times(1)).publish(captureRequest.capture()); + PublishRequest request = captureRequest.getValue(); + assertEquals("arn:aws:sns:us-west-2:123456789012:test-topic-1.fifo", request.getTopicArn()); + assertEquals("Test Message Content", request.getMessage()); + assertEquals("test-subject", request.getSubject()); + assertEquals("test1234", request.getMessageGroupId()); + assertEquals("fb0dfed8-092e-40ee-83ce-5b576cd26236", request.getMessageDeduplicationId()); + assertEquals("hello!", request.getMessageAttributes().get("DynamicProperty").getStringValue()); + + runner.assertAllFlowFilesTransferred(PutSNS.REL_SUCCESS, 1); + List flowFiles = runner.getFlowFilesForRelationship(PutSNS.REL_SUCCESS); + MockFlowFile ff0 = flowFiles.get(0); + ff0.assertAttributeEquals(CoreAttributes.FILENAME.key(), "1.txt"); + } + @Test public void testPublishFailure() throws IOException { runner.setProperty(PutSNS.ARN, "arn:aws:sns:us-west-2:123456789012:test-topic-1");