NIFI-8087 Adds MessageGroupID and MessageDuplicationId to PutSNS

Updated topic arn assert to match dummy topic name with .fifo suffix

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #5367.
This commit is contained in:
Alasdair Brown 2021-09-04 14:12:49 +01:00 committed by Pierre Villard
parent 9dfcba24cd
commit 50e60c812d
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
2 changed files with 67 additions and 1 deletions

View File

@ -75,9 +75,28 @@ public class PutSNS extends AbstractSNSProcessor {
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build(); .build();
public static final PropertyDescriptor MESSAGEGROUPID = new PropertyDescriptor.Builder()
.name("Message Group ID")
.displayName("Message Group ID")
.description("If using FIFO, the message group to which the flowFile belongs")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor MESSAGEDEDUPLICATIONID = new PropertyDescriptor.Builder()
.name("Deduplication Message ID")
.displayName("Deduplication Message ID")
.description("The token used for deduplication of sent messages")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList( public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
Arrays.asList(ARN, ARN_TYPE, SUBJECT, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, Arrays.asList(ARN, ARN_TYPE, SUBJECT, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT,
USE_JSON_STRUCTURE, CHARACTER_ENCODING, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD)); USE_JSON_STRUCTURE, CHARACTER_ENCODING, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD,
MESSAGEGROUPID, MESSAGEDEDUPLICATIONID));
public static final int MAX_SIZE = 256 * 1024; public static final int MAX_SIZE = 256 * 1024;
@ -120,6 +139,18 @@ public class PutSNS extends AbstractSNSProcessor {
final PublishRequest request = new PublishRequest(); final PublishRequest request = new PublishRequest();
request.setMessage(message); request.setMessage(message);
if (context.getProperty(MESSAGEGROUPID).isSet()) {
request.setMessageGroupId(context.getProperty(MESSAGEGROUPID)
.evaluateAttributeExpressions(flowFile)
.getValue());
}
if (context.getProperty(MESSAGEDEDUPLICATIONID).isSet()) {
request.setMessageDeduplicationId(context.getProperty(MESSAGEDEDUPLICATIONID)
.evaluateAttributeExpressions(flowFile)
.getValue());
}
if (context.getProperty(USE_JSON_STRUCTURE).asBoolean()) { if (context.getProperty(USE_JSON_STRUCTURE).asBoolean()) {
request.setMessageStructure("json"); request.setMessageStructure("json");
} }

View File

@ -89,6 +89,41 @@ public class TestPutSNS {
ff0.assertAttributeEquals(CoreAttributes.FILENAME.key(), "1.txt"); ff0.assertAttributeEquals(CoreAttributes.FILENAME.key(), "1.txt");
} }
@Test
public void testPublishFIFO() throws IOException {
runner.setProperty(PutSNS.CREDENTIALS_FILE, "src/test/resources/mock-aws-credentials.properties");
runner.setProperty(PutSNS.ARN, "arn:aws:sns:us-west-2:123456789012:test-topic-1.fifo");
runner.setProperty(PutSNS.SUBJECT, "${eval.subject}");
runner.setProperty(PutSNS.MESSAGEDEDUPLICATIONID, "${myuuid}");
runner.setProperty(PutSNS.MESSAGEGROUPID, "test1234");
assertTrue(runner.setProperty("DynamicProperty", "hello!").isValid());
final Map<String, String> ffAttributes = new HashMap<>();
ffAttributes.put("filename", "1.txt");
ffAttributes.put("eval.subject", "test-subject");
ffAttributes.put("myuuid", "fb0dfed8-092e-40ee-83ce-5b576cd26236");
runner.enqueue("Test Message Content", ffAttributes);
PublishResult mockPublishResult = new PublishResult();
Mockito.when(mockSNSClient.publish(Mockito.any(PublishRequest.class))).thenReturn(mockPublishResult);
runner.run();
ArgumentCaptor<PublishRequest> captureRequest = ArgumentCaptor.forClass(PublishRequest.class);
Mockito.verify(mockSNSClient, Mockito.times(1)).publish(captureRequest.capture());
PublishRequest request = captureRequest.getValue();
assertEquals("arn:aws:sns:us-west-2:123456789012:test-topic-1.fifo", request.getTopicArn());
assertEquals("Test Message Content", request.getMessage());
assertEquals("test-subject", request.getSubject());
assertEquals("test1234", request.getMessageGroupId());
assertEquals("fb0dfed8-092e-40ee-83ce-5b576cd26236", request.getMessageDeduplicationId());
assertEquals("hello!", request.getMessageAttributes().get("DynamicProperty").getStringValue());
runner.assertAllFlowFilesTransferred(PutSNS.REL_SUCCESS, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutSNS.REL_SUCCESS);
MockFlowFile ff0 = flowFiles.get(0);
ff0.assertAttributeEquals(CoreAttributes.FILENAME.key(), "1.txt");
}
@Test @Test
public void testPublishFailure() throws IOException { public void testPublishFailure() throws IOException {
runner.setProperty(PutSNS.ARN, "arn:aws:sns:us-west-2:123456789012:test-topic-1"); runner.setProperty(PutSNS.ARN, "arn:aws:sns:us-west-2:123456789012:test-topic-1");