diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/pom.xml b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/pom.xml index 177839b7de..9e74b33969 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/pom.xml +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/pom.xml @@ -43,6 +43,16 @@ + + software.amazon.awssdk + sns + + + software.amazon.awssdk + netty-nio-client + + + com.amazonaws aws-java-sdk-core @@ -78,14 +88,6 @@ com.amazonaws aws-java-sdk-s3 - - com.amazonaws - aws-java-sdk-sns - - - com.amazonaws - aws-java-sdk-sqs - commons-io commons-io diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/sns/AbstractSNSProcessor.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/sns/AbstractSNSProcessor.java index ba955e4d89..b2621d49dc 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/sns/AbstractSNSProcessor.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/sns/AbstractSNSProcessor.java @@ -21,14 +21,12 @@ import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor; -import com.amazonaws.ClientConfiguration; -import com.amazonaws.auth.AWSCredentials; -import com.amazonaws.auth.AWSCredentialsProvider; -import com.amazonaws.services.sns.AmazonSNSClient; +import org.apache.nifi.processors.aws.v2.AbstractAwsProcessor; +import software.amazon.awssdk.services.sns.SnsClient; +import software.amazon.awssdk.services.sns.SnsClientBuilder; -public abstract class AbstractSNSProcessor extends AbstractAWSCredentialsProviderProcessor { +public abstract class AbstractSNSProcessor extends AbstractAwsProcessor { protected static final AllowableValue ARN_TYPE_TOPIC = new AllowableValue("Topic ARN", "Topic ARN", "The ARN is the name of a topic"); @@ -52,25 +50,9 @@ public abstract class AbstractSNSProcessor extends AbstractAWSCredentialsProvide .defaultValue(ARN_TYPE_TOPIC.getValue()) .build(); - /** - * Create client using aws credentials provider. This is the preferred way for creating clients - */ + @Override - protected AmazonSNSClient createClient(final ProcessContext context, final AWSCredentialsProvider credentialsProvider, final ClientConfiguration config) { - getLogger().info("Creating client using aws credentials provider"); - - return new AmazonSNSClient(credentialsProvider, config); - } - - /** - * Create client using AWSCredentials - * - * @deprecated use {@link #createClient(ProcessContext, AWSCredentialsProvider, ClientConfiguration)} instead - */ - @Override - protected AmazonSNSClient createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config) { - getLogger().info("Creating client using aws credentials"); - - return new AmazonSNSClient(credentials, config); + protected SnsClientBuilder createClientBuilder(final ProcessContext context) { + return SnsClient.builder(); } } diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java index 50c966fdaa..43a1dec034 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java @@ -16,9 +16,6 @@ */ package org.apache.nifi.processors.aws.sns; -import com.amazonaws.services.sns.AmazonSNSClient; -import com.amazonaws.services.sns.model.MessageAttributeValue; -import com.amazonaws.services.sns.model.PublishRequest; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.behavior.InputRequirement; @@ -35,11 +32,15 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.aws.sqs.GetSQS; import org.apache.nifi.processors.aws.sqs.PutSQS; +import software.amazon.awssdk.services.sns.SnsClient; +import software.amazon.awssdk.services.sns.model.MessageAttributeValue; +import software.amazon.awssdk.services.sns.model.PublishRequest; import java.io.ByteArrayOutputStream; import java.nio.charset.Charset; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -137,55 +138,59 @@ public class PutSNS extends AbstractSNSProcessor { session.exportTo(flowFile, baos); final String message = new String(baos.toByteArray(), charset); - final AmazonSNSClient client = getClient(context); - final PublishRequest request = new PublishRequest(); - request.setMessage(message); + final SnsClient client = getClient(context); + + final PublishRequest.Builder requestBuilder = PublishRequest.builder(); + requestBuilder.message(message); if (context.getProperty(MESSAGEGROUPID).isSet()) { - request.setMessageGroupId(context.getProperty(MESSAGEGROUPID) + requestBuilder.messageGroupId(context.getProperty(MESSAGEGROUPID) .evaluateAttributeExpressions(flowFile) .getValue()); } if (context.getProperty(MESSAGEDEDUPLICATIONID).isSet()) { - request.setMessageDeduplicationId(context.getProperty(MESSAGEDEDUPLICATIONID) + requestBuilder.messageDeduplicationId(context.getProperty(MESSAGEDEDUPLICATIONID) .evaluateAttributeExpressions(flowFile) .getValue()); } if (context.getProperty(USE_JSON_STRUCTURE).asBoolean()) { - request.setMessageStructure("json"); + requestBuilder.messageStructure("json"); } final String arn = context.getProperty(ARN).evaluateAttributeExpressions(flowFile).getValue(); final String arnType = context.getProperty(ARN_TYPE).getValue(); if (arnType.equalsIgnoreCase(ARN_TYPE_TOPIC.getValue())) { - request.setTopicArn(arn); + requestBuilder.topicArn(arn); } else { - request.setTargetArn(arn); + requestBuilder.targetArn(arn); } final String subject = context.getProperty(SUBJECT).evaluateAttributeExpressions(flowFile).getValue(); if (subject != null) { - request.setSubject(subject); + requestBuilder.subject(subject); } + final Map messageAttributes = new HashMap<>(); for (final Map.Entry entry : context.getProperties().entrySet()) { if (entry.getKey().isDynamic() && !StringUtils.isEmpty(entry.getValue())) { - final MessageAttributeValue value = new MessageAttributeValue(); - value.setStringValue(context.getProperty(entry.getKey()).evaluateAttributeExpressions(flowFile).getValue()); - value.setDataType("String"); - request.addMessageAttributesEntry(entry.getKey().getName(), value); + final MessageAttributeValue.Builder messageAttributeValueBuilder = MessageAttributeValue.builder(); + messageAttributeValueBuilder.stringValue(context.getProperty(entry.getKey()).evaluateAttributeExpressions(flowFile).getValue()); + messageAttributeValueBuilder.dataType("String"); + + messageAttributes.put(entry.getKey().getName(), messageAttributeValueBuilder.build()); } } + requestBuilder.messageAttributes(messageAttributes); try { - client.publish(request); + client.publish(requestBuilder.build()); session.transfer(flowFile, REL_SUCCESS); session.getProvenanceReporter().send(flowFile, arn); - getLogger().info("Successfully published notification for {}", new Object[]{flowFile}); + getLogger().info("Publishing completed for {}", flowFile); } catch (final Exception e) { - getLogger().error("Failed to publish Amazon SNS message for {} due to {}", new Object[]{flowFile, e}); + getLogger().error("Publishing failed for {}", flowFile, e); flowFile = session.penalize(flowFile); session.transfer(flowFile, REL_FAILURE); } diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sns/ITPutSNS.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sns/ITPutSNS.java index 4d5e5763fd..a0de0ccac7 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sns/ITPutSNS.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sns/ITPutSNS.java @@ -35,12 +35,13 @@ import static org.junit.jupiter.api.Assertions.assertTrue; public class ITPutSNS { private final String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties"; + private final String TOPIC_ARN = "Add SNS ARN here"; @Test public void testPublish() throws IOException { final TestRunner runner = TestRunners.newTestRunner(new PutSNS()); runner.setProperty(PutSNS.CREDENTIALS_FILE, CREDENTIALS_FILE); - runner.setProperty(PutSNS.ARN, "arn:aws:sns:us-west-2:100515378163:test-topic-1"); + runner.setProperty(PutSNS.ARN, TOPIC_ARN); assertTrue(runner.setProperty("DynamicProperty", "hello!").isValid()); final Map attrs = new HashMap<>(); @@ -54,8 +55,7 @@ public class ITPutSNS { @Test public void testPublishWithCredentialsProviderService() throws Throwable { final TestRunner runner = TestRunners.newTestRunner(new PutSNS()); - String snsArn = "Add Sns arn here"; - runner.setProperty(PutSNS.ARN, snsArn); + runner.setProperty(PutSNS.ARN, TOPIC_ARN); assertTrue(runner.setProperty("DynamicProperty", "hello!").isValid()); final AWSCredentialsProviderControllerService serviceImpl = new AWSCredentialsProviderControllerService(); diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sns/TestPutSNS.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sns/TestPutSNS.java index 1f87becd53..828eaf1e07 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sns/TestPutSNS.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sns/TestPutSNS.java @@ -16,10 +16,6 @@ */ package org.apache.nifi.processors.aws.sns; -import com.amazonaws.services.sns.AmazonSNSClient; -import com.amazonaws.services.sns.model.AmazonSNSException; -import com.amazonaws.services.sns.model.PublishRequest; -import com.amazonaws.services.sns.model.PublishResult; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.util.MockFlowFile; @@ -29,6 +25,10 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; import org.mockito.Mockito; +import software.amazon.awssdk.services.sns.SnsClient; +import software.amazon.awssdk.services.sns.model.PublishRequest; +import software.amazon.awssdk.services.sns.model.PublishResponse; +import software.amazon.awssdk.services.sns.model.SnsException; import java.util.HashMap; import java.util.List; @@ -42,14 +42,14 @@ public class TestPutSNS { private TestRunner runner = null; private PutSNS mockPutSNS = null; - private AmazonSNSClient mockSNSClient = null; + private SnsClient mockSNSClient = null; @BeforeEach public void setUp() { - mockSNSClient = Mockito.mock(AmazonSNSClient.class); + mockSNSClient = Mockito.mock(SnsClient.class); mockPutSNS = new PutSNS() { @Override - protected AmazonSNSClient getClient(ProcessContext context) { + protected SnsClient getClient(ProcessContext context) { return mockSNSClient; } }; @@ -67,18 +67,18 @@ public class TestPutSNS { ffAttributes.put("eval.subject", "test-subject"); runner.enqueue("Test Message Content", ffAttributes); - PublishResult mockPublishResult = new PublishResult(); - Mockito.when(mockSNSClient.publish(Mockito.any(PublishRequest.class))).thenReturn(mockPublishResult); + final PublishResponse mockPublishResponse = PublishResponse.builder().build(); + Mockito.when(mockSNSClient.publish(Mockito.any(PublishRequest.class))).thenReturn(mockPublishResponse); runner.run(); ArgumentCaptor captureRequest = ArgumentCaptor.forClass(PublishRequest.class); Mockito.verify(mockSNSClient, Mockito.times(1)).publish(captureRequest.capture()); PublishRequest request = captureRequest.getValue(); - assertEquals("arn:aws:sns:us-west-2:123456789012:test-topic-1", request.getTopicArn()); - assertEquals("Test Message Content", request.getMessage()); - assertEquals("test-subject", request.getSubject()); - assertEquals("hello!", request.getMessageAttributes().get("DynamicProperty").getStringValue()); + assertEquals("arn:aws:sns:us-west-2:123456789012:test-topic-1", request.topicArn()); + assertEquals("Test Message Content", request.message()); + assertEquals("test-subject", request.subject()); + assertEquals("hello!", request.messageAttributes().get("DynamicProperty").stringValue()); runner.assertAllFlowFilesTransferred(PutSNS.REL_SUCCESS, 1); List flowFiles = runner.getFlowFilesForRelationship(PutSNS.REL_SUCCESS); @@ -100,20 +100,20 @@ public class TestPutSNS { ffAttributes.put("myuuid", "fb0dfed8-092e-40ee-83ce-5b576cd26236"); runner.enqueue("Test Message Content", ffAttributes); - PublishResult mockPublishResult = new PublishResult(); - Mockito.when(mockSNSClient.publish(Mockito.any(PublishRequest.class))).thenReturn(mockPublishResult); + final PublishResponse mockPublishResponse = PublishResponse.builder().build(); + Mockito.when(mockSNSClient.publish(Mockito.any(PublishRequest.class))).thenReturn(mockPublishResponse); runner.run(); ArgumentCaptor captureRequest = ArgumentCaptor.forClass(PublishRequest.class); Mockito.verify(mockSNSClient, Mockito.times(1)).publish(captureRequest.capture()); PublishRequest request = captureRequest.getValue(); - assertEquals("arn:aws:sns:us-west-2:123456789012:test-topic-1.fifo", request.getTopicArn()); - assertEquals("Test Message Content", request.getMessage()); - assertEquals("test-subject", request.getSubject()); - assertEquals("test1234", request.getMessageGroupId()); - assertEquals("fb0dfed8-092e-40ee-83ce-5b576cd26236", request.getMessageDeduplicationId()); - assertEquals("hello!", request.getMessageAttributes().get("DynamicProperty").getStringValue()); + assertEquals("arn:aws:sns:us-west-2:123456789012:test-topic-1.fifo", request.topicArn()); + assertEquals("Test Message Content", request.message()); + assertEquals("test-subject", request.subject()); + assertEquals("test1234", request.messageGroupId()); + assertEquals("fb0dfed8-092e-40ee-83ce-5b576cd26236", request.messageDeduplicationId()); + assertEquals("hello!", request.messageAttributes().get("DynamicProperty").stringValue()); runner.assertAllFlowFilesTransferred(PutSNS.REL_SUCCESS, 1); List flowFiles = runner.getFlowFilesForRelationship(PutSNS.REL_SUCCESS); @@ -127,7 +127,7 @@ public class TestPutSNS { final Map ffAttributes = new HashMap<>(); ffAttributes.put("filename", "1.txt"); runner.enqueue("Test Message Content", ffAttributes); - Mockito.when(mockSNSClient.publish(Mockito.any(PublishRequest.class))).thenThrow(new AmazonSNSException("Fail")); + Mockito.when(mockSNSClient.publish(Mockito.any(PublishRequest.class))).thenThrow(SnsException.builder().build()); runner.run();