NIFI-11642 Updated PutSNS using AWS SDK 2

This closes #7362

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Emilio Setiadarma 2023-06-05 11:51:11 -07:00 committed by exceptionfactory
parent 336b857442
commit 31e1c17d44
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
5 changed files with 66 additions and 77 deletions

View File

@ -43,6 +43,16 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sns</artifactId>
<exclusions>
<exclusion>
<groupId>software.amazon.awssdk</groupId>
<artifactId>netty-nio-client</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-core</artifactId>
@ -78,14 +88,6 @@
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-s3</artifactId>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-sns</artifactId>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-sqs</artifactId>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>

View File

@ -21,14 +21,12 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.services.sns.AmazonSNSClient;
import org.apache.nifi.processors.aws.v2.AbstractAwsProcessor;
import software.amazon.awssdk.services.sns.SnsClient;
import software.amazon.awssdk.services.sns.SnsClientBuilder;
public abstract class AbstractSNSProcessor extends AbstractAWSCredentialsProviderProcessor<AmazonSNSClient> {
public abstract class AbstractSNSProcessor extends AbstractAwsProcessor<SnsClient, SnsClientBuilder> {
protected static final AllowableValue ARN_TYPE_TOPIC
= new AllowableValue("Topic ARN", "Topic ARN", "The ARN is the name of a topic");
@ -52,25 +50,9 @@ public abstract class AbstractSNSProcessor extends AbstractAWSCredentialsProvide
.defaultValue(ARN_TYPE_TOPIC.getValue())
.build();
/**
* Create client using aws credentials provider. This is the preferred way for creating clients
*/
@Override
protected AmazonSNSClient createClient(final ProcessContext context, final AWSCredentialsProvider credentialsProvider, final ClientConfiguration config) {
getLogger().info("Creating client using aws credentials provider");
return new AmazonSNSClient(credentialsProvider, config);
}
/**
* Create client using AWSCredentials
*
* @deprecated use {@link #createClient(ProcessContext, AWSCredentialsProvider, ClientConfiguration)} instead
*/
@Override
protected AmazonSNSClient createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config) {
getLogger().info("Creating client using aws credentials");
return new AmazonSNSClient(credentials, config);
protected SnsClientBuilder createClientBuilder(final ProcessContext context) {
return SnsClient.builder();
}
}

View File

@ -16,9 +16,6 @@
*/
package org.apache.nifi.processors.aws.sns;
import com.amazonaws.services.sns.AmazonSNSClient;
import com.amazonaws.services.sns.model.MessageAttributeValue;
import com.amazonaws.services.sns.model.PublishRequest;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
@ -35,11 +32,15 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.aws.sqs.GetSQS;
import org.apache.nifi.processors.aws.sqs.PutSQS;
import software.amazon.awssdk.services.sns.SnsClient;
import software.amazon.awssdk.services.sns.model.MessageAttributeValue;
import software.amazon.awssdk.services.sns.model.PublishRequest;
import java.io.ByteArrayOutputStream;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -137,55 +138,59 @@ public class PutSNS extends AbstractSNSProcessor {
session.exportTo(flowFile, baos);
final String message = new String(baos.toByteArray(), charset);
final AmazonSNSClient client = getClient(context);
final PublishRequest request = new PublishRequest();
request.setMessage(message);
final SnsClient client = getClient(context);
final PublishRequest.Builder requestBuilder = PublishRequest.builder();
requestBuilder.message(message);
if (context.getProperty(MESSAGEGROUPID).isSet()) {
request.setMessageGroupId(context.getProperty(MESSAGEGROUPID)
requestBuilder.messageGroupId(context.getProperty(MESSAGEGROUPID)
.evaluateAttributeExpressions(flowFile)
.getValue());
}
if (context.getProperty(MESSAGEDEDUPLICATIONID).isSet()) {
request.setMessageDeduplicationId(context.getProperty(MESSAGEDEDUPLICATIONID)
requestBuilder.messageDeduplicationId(context.getProperty(MESSAGEDEDUPLICATIONID)
.evaluateAttributeExpressions(flowFile)
.getValue());
}
if (context.getProperty(USE_JSON_STRUCTURE).asBoolean()) {
request.setMessageStructure("json");
requestBuilder.messageStructure("json");
}
final String arn = context.getProperty(ARN).evaluateAttributeExpressions(flowFile).getValue();
final String arnType = context.getProperty(ARN_TYPE).getValue();
if (arnType.equalsIgnoreCase(ARN_TYPE_TOPIC.getValue())) {
request.setTopicArn(arn);
requestBuilder.topicArn(arn);
} else {
request.setTargetArn(arn);
requestBuilder.targetArn(arn);
}
final String subject = context.getProperty(SUBJECT).evaluateAttributeExpressions(flowFile).getValue();
if (subject != null) {
request.setSubject(subject);
requestBuilder.subject(subject);
}
final Map<String, MessageAttributeValue> messageAttributes = new HashMap<>();
for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
if (entry.getKey().isDynamic() && !StringUtils.isEmpty(entry.getValue())) {
final MessageAttributeValue value = new MessageAttributeValue();
value.setStringValue(context.getProperty(entry.getKey()).evaluateAttributeExpressions(flowFile).getValue());
value.setDataType("String");
request.addMessageAttributesEntry(entry.getKey().getName(), value);
final MessageAttributeValue.Builder messageAttributeValueBuilder = MessageAttributeValue.builder();
messageAttributeValueBuilder.stringValue(context.getProperty(entry.getKey()).evaluateAttributeExpressions(flowFile).getValue());
messageAttributeValueBuilder.dataType("String");
messageAttributes.put(entry.getKey().getName(), messageAttributeValueBuilder.build());
}
}
requestBuilder.messageAttributes(messageAttributes);
try {
client.publish(request);
client.publish(requestBuilder.build());
session.transfer(flowFile, REL_SUCCESS);
session.getProvenanceReporter().send(flowFile, arn);
getLogger().info("Successfully published notification for {}", new Object[]{flowFile});
getLogger().info("Publishing completed for {}", flowFile);
} catch (final Exception e) {
getLogger().error("Failed to publish Amazon SNS message for {} due to {}", new Object[]{flowFile, e});
getLogger().error("Publishing failed for {}", flowFile, e);
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
}

View File

@ -35,12 +35,13 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class ITPutSNS {
private final String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties";
private final String TOPIC_ARN = "Add SNS ARN here";
@Test
public void testPublish() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new PutSNS());
runner.setProperty(PutSNS.CREDENTIALS_FILE, CREDENTIALS_FILE);
runner.setProperty(PutSNS.ARN, "arn:aws:sns:us-west-2:100515378163:test-topic-1");
runner.setProperty(PutSNS.ARN, TOPIC_ARN);
assertTrue(runner.setProperty("DynamicProperty", "hello!").isValid());
final Map<String, String> attrs = new HashMap<>();
@ -54,8 +55,7 @@ public class ITPutSNS {
@Test
public void testPublishWithCredentialsProviderService() throws Throwable {
final TestRunner runner = TestRunners.newTestRunner(new PutSNS());
String snsArn = "Add Sns arn here";
runner.setProperty(PutSNS.ARN, snsArn);
runner.setProperty(PutSNS.ARN, TOPIC_ARN);
assertTrue(runner.setProperty("DynamicProperty", "hello!").isValid());
final AWSCredentialsProviderControllerService serviceImpl = new AWSCredentialsProviderControllerService();

View File

@ -16,10 +16,6 @@
*/
package org.apache.nifi.processors.aws.sns;
import com.amazonaws.services.sns.AmazonSNSClient;
import com.amazonaws.services.sns.model.AmazonSNSException;
import com.amazonaws.services.sns.model.PublishRequest;
import com.amazonaws.services.sns.model.PublishResult;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.util.MockFlowFile;
@ -29,6 +25,10 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import software.amazon.awssdk.services.sns.SnsClient;
import software.amazon.awssdk.services.sns.model.PublishRequest;
import software.amazon.awssdk.services.sns.model.PublishResponse;
import software.amazon.awssdk.services.sns.model.SnsException;
import java.util.HashMap;
import java.util.List;
@ -42,14 +42,14 @@ public class TestPutSNS {
private TestRunner runner = null;
private PutSNS mockPutSNS = null;
private AmazonSNSClient mockSNSClient = null;
private SnsClient mockSNSClient = null;
@BeforeEach
public void setUp() {
mockSNSClient = Mockito.mock(AmazonSNSClient.class);
mockSNSClient = Mockito.mock(SnsClient.class);
mockPutSNS = new PutSNS() {
@Override
protected AmazonSNSClient getClient(ProcessContext context) {
protected SnsClient getClient(ProcessContext context) {
return mockSNSClient;
}
};
@ -67,18 +67,18 @@ public class TestPutSNS {
ffAttributes.put("eval.subject", "test-subject");
runner.enqueue("Test Message Content", ffAttributes);
PublishResult mockPublishResult = new PublishResult();
Mockito.when(mockSNSClient.publish(Mockito.any(PublishRequest.class))).thenReturn(mockPublishResult);
final PublishResponse mockPublishResponse = PublishResponse.builder().build();
Mockito.when(mockSNSClient.publish(Mockito.any(PublishRequest.class))).thenReturn(mockPublishResponse);
runner.run();
ArgumentCaptor<PublishRequest> captureRequest = ArgumentCaptor.forClass(PublishRequest.class);
Mockito.verify(mockSNSClient, Mockito.times(1)).publish(captureRequest.capture());
PublishRequest request = captureRequest.getValue();
assertEquals("arn:aws:sns:us-west-2:123456789012:test-topic-1", request.getTopicArn());
assertEquals("Test Message Content", request.getMessage());
assertEquals("test-subject", request.getSubject());
assertEquals("hello!", request.getMessageAttributes().get("DynamicProperty").getStringValue());
assertEquals("arn:aws:sns:us-west-2:123456789012:test-topic-1", request.topicArn());
assertEquals("Test Message Content", request.message());
assertEquals("test-subject", request.subject());
assertEquals("hello!", request.messageAttributes().get("DynamicProperty").stringValue());
runner.assertAllFlowFilesTransferred(PutSNS.REL_SUCCESS, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutSNS.REL_SUCCESS);
@ -100,20 +100,20 @@ public class TestPutSNS {
ffAttributes.put("myuuid", "fb0dfed8-092e-40ee-83ce-5b576cd26236");
runner.enqueue("Test Message Content", ffAttributes);
PublishResult mockPublishResult = new PublishResult();
Mockito.when(mockSNSClient.publish(Mockito.any(PublishRequest.class))).thenReturn(mockPublishResult);
final PublishResponse mockPublishResponse = PublishResponse.builder().build();
Mockito.when(mockSNSClient.publish(Mockito.any(PublishRequest.class))).thenReturn(mockPublishResponse);
runner.run();
ArgumentCaptor<PublishRequest> captureRequest = ArgumentCaptor.forClass(PublishRequest.class);
Mockito.verify(mockSNSClient, Mockito.times(1)).publish(captureRequest.capture());
PublishRequest request = captureRequest.getValue();
assertEquals("arn:aws:sns:us-west-2:123456789012:test-topic-1.fifo", request.getTopicArn());
assertEquals("Test Message Content", request.getMessage());
assertEquals("test-subject", request.getSubject());
assertEquals("test1234", request.getMessageGroupId());
assertEquals("fb0dfed8-092e-40ee-83ce-5b576cd26236", request.getMessageDeduplicationId());
assertEquals("hello!", request.getMessageAttributes().get("DynamicProperty").getStringValue());
assertEquals("arn:aws:sns:us-west-2:123456789012:test-topic-1.fifo", request.topicArn());
assertEquals("Test Message Content", request.message());
assertEquals("test-subject", request.subject());
assertEquals("test1234", request.messageGroupId());
assertEquals("fb0dfed8-092e-40ee-83ce-5b576cd26236", request.messageDeduplicationId());
assertEquals("hello!", request.messageAttributes().get("DynamicProperty").stringValue());
runner.assertAllFlowFilesTransferred(PutSNS.REL_SUCCESS, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutSNS.REL_SUCCESS);
@ -127,7 +127,7 @@ public class TestPutSNS {
final Map<String, String> ffAttributes = new HashMap<>();
ffAttributes.put("filename", "1.txt");
runner.enqueue("Test Message Content", ffAttributes);
Mockito.when(mockSNSClient.publish(Mockito.any(PublishRequest.class))).thenThrow(new AmazonSNSException("Fail"));
Mockito.when(mockSNSClient.publish(Mockito.any(PublishRequest.class))).thenThrow(SnsException.builder().build());
runner.run();