From a584251f0aba49dfad90fb0d36ab568effa7005d Mon Sep 17 00:00:00 2001 From: Joe Gresock Date: Sun, 8 Oct 2023 06:30:22 -0400 Subject: [PATCH] NIFI-12189: Upgrading PutCloudwatchMetric to use AWS SDK 2.x This closes #7857 Signed-off-by: Chris Sampson --- .../nifi-aws-processors/pom.xml | 8 +- .../aws/cloudwatch/PutCloudWatchMetric.java | 117 ++++++++---------- .../cloudwatch/MockPutCloudWatchMetric.java | 16 +-- .../cloudwatch/TestPutCloudWatchMetric.java | 56 +++++---- 4 files changed, 91 insertions(+), 106 deletions(-) diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml index 6b05a42cc4..9cd6ef33f0 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml @@ -66,10 +66,6 @@ org.apache.nifi nifi-ssl-context-service-api - - com.amazonaws - aws-java-sdk-cloudwatchmetrics - com.amazonaws aws-java-sdk-sts @@ -78,6 +74,10 @@ software.amazon.awssdk sts + + software.amazon.awssdk + cloudwatch + commons-beanutils commons-beanutils diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/cloudwatch/PutCloudWatchMetric.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/cloudwatch/PutCloudWatchMetric.java index 678fadfaa0..2e4a702e07 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/cloudwatch/PutCloudWatchMetric.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/cloudwatch/PutCloudWatchMetric.java @@ -16,16 +16,7 @@ */ package org.apache.nifi.processors.aws.cloudwatch; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.Date; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.stream.Collectors; - +import com.amazonaws.AmazonClientException; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.behavior.InputRequirement; @@ -45,19 +36,25 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor; +import org.apache.nifi.processors.aws.v2.AbstractAwsSyncProcessor; +import software.amazon.awssdk.services.cloudwatch.CloudWatchClient; +import software.amazon.awssdk.services.cloudwatch.CloudWatchClientBuilder; +import software.amazon.awssdk.services.cloudwatch.model.Dimension; +import software.amazon.awssdk.services.cloudwatch.model.MetricDatum; +import software.amazon.awssdk.services.cloudwatch.model.PutMetricDataRequest; +import software.amazon.awssdk.services.cloudwatch.model.PutMetricDataResponse; +import software.amazon.awssdk.services.cloudwatch.model.StandardUnit; +import software.amazon.awssdk.services.cloudwatch.model.StatisticSet; -import com.amazonaws.AmazonClientException; -import com.amazonaws.ClientConfiguration; -import com.amazonaws.auth.AWSCredentials; -import com.amazonaws.auth.AWSCredentialsProvider; -import com.amazonaws.services.cloudwatch.AmazonCloudWatchClient; -import com.amazonaws.services.cloudwatch.model.Dimension; -import com.amazonaws.services.cloudwatch.model.MetricDatum; -import com.amazonaws.services.cloudwatch.model.PutMetricDataRequest; -import com.amazonaws.services.cloudwatch.model.PutMetricDataResult; -import com.amazonaws.services.cloudwatch.model.StatisticSet; -import com.amazonaws.services.cloudwatch.model.StandardUnit; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; @SupportsBatching @InputRequirement(Requirement.INPUT_REQUIRED) @@ -67,7 +64,7 @@ import com.amazonaws.services.cloudwatch.model.StandardUnit; description = "Allows dimension name/value pairs to be added to the metric. AWS supports a maximum of 10 dimensions.", expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) @Tags({"amazon", "aws", "cloudwatch", "metrics", "put", "publish"}) -public class PutCloudWatchMetric extends AbstractAWSCredentialsProviderProcessor { +public class PutCloudWatchMetric extends AbstractAwsSyncProcessor { public static final Set relationships = Collections.unmodifiableSet( new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE))); @@ -200,6 +197,11 @@ public class PutCloudWatchMetric extends AbstractAWSCredentialsProviderProcessor private volatile Set dynamicPropertyNames = new HashSet<>(); + @Override + protected CloudWatchClientBuilder createClientBuilder(final ProcessContext context) { + return CloudWatchClient.builder(); + } + @Override protected List getSupportedPropertyDescriptors() { return properties; @@ -262,90 +264,71 @@ public class PutCloudWatchMetric extends AbstractAWSCredentialsProviderProcessor return problems; } - /** - * Create client using aws credentials provider. This is the preferred way for creating clients - */ @Override - protected AmazonCloudWatchClient createClient(ProcessContext processContext, AWSCredentialsProvider awsCredentialsProvider, ClientConfiguration clientConfiguration) { - getLogger().info("Creating client using aws credentials provider"); - return new AmazonCloudWatchClient(awsCredentialsProvider, clientConfiguration); - } - - /** - * Create client using AWSCredentials - * - * @deprecated use {@link #createClient(ProcessContext, AWSCredentialsProvider, ClientConfiguration)} instead - */ - @Override - @Deprecated - protected AmazonCloudWatchClient createClient(ProcessContext processContext, AWSCredentials awsCredentials, ClientConfiguration clientConfiguration) { - getLogger().debug("Creating client with aws credentials"); - return new AmazonCloudWatchClient(awsCredentials, clientConfiguration); - } - - @Override - public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { FlowFile flowFile = session.get(); if (flowFile == null) { return; } - MetricDatum datum = new MetricDatum(); + final MetricDatum.Builder datumBuilder = MetricDatum.builder(); try { - datum.setMetricName(context.getProperty(METRIC_NAME).evaluateAttributeExpressions(flowFile).getValue()); + datumBuilder.metricName(context.getProperty(METRIC_NAME).evaluateAttributeExpressions(flowFile).getValue()); final String valueString = context.getProperty(VALUE).evaluateAttributeExpressions(flowFile).getValue(); if (valueString != null) { - datum.setValue(Double.parseDouble(valueString)); + datumBuilder.value(Double.parseDouble(valueString)); } else { - StatisticSet statisticSet = new StatisticSet(); - statisticSet.setMaximum(Double.parseDouble(context.getProperty(MAXIMUM).evaluateAttributeExpressions(flowFile).getValue())); - statisticSet.setMinimum(Double.parseDouble(context.getProperty(MINIMUM).evaluateAttributeExpressions(flowFile).getValue())); - statisticSet.setSampleCount(Double.parseDouble(context.getProperty(SAMPLECOUNT).evaluateAttributeExpressions(flowFile).getValue())); - statisticSet.setSum(Double.parseDouble(context.getProperty(SUM).evaluateAttributeExpressions(flowFile).getValue())); + final StatisticSet statisticSet = StatisticSet.builder() + .maximum(Double.parseDouble(context.getProperty(MAXIMUM).evaluateAttributeExpressions(flowFile).getValue())) + .minimum(Double.parseDouble(context.getProperty(MINIMUM).evaluateAttributeExpressions(flowFile).getValue())) + .sampleCount(Double.parseDouble(context.getProperty(SAMPLECOUNT).evaluateAttributeExpressions(flowFile).getValue())) + .sum(Double.parseDouble(context.getProperty(SUM).evaluateAttributeExpressions(flowFile).getValue())) + .build(); - datum.setStatisticValues(statisticSet); + datumBuilder.statisticValues(statisticSet); } final String timestamp = context.getProperty(TIMESTAMP).evaluateAttributeExpressions(flowFile).getValue(); if (timestamp != null) { - datum.setTimestamp(new Date(Long.parseLong(timestamp))); + datumBuilder.timestamp(new Date(Long.parseLong(timestamp)).toInstant()); } final String unit = context.getProperty(UNIT).evaluateAttributeExpressions(flowFile).getValue(); if (unit != null) { - datum.setUnit(unit); + datumBuilder.unit(unit); } // add dynamic properties as dimensions if (!dynamicPropertyNames.isEmpty()) { final List dimensions = new ArrayList<>(dynamicPropertyNames.size()); - for (String propertyName : dynamicPropertyNames) { + for (final String propertyName : dynamicPropertyNames) { final String propertyValue = context.getProperty(propertyName).evaluateAttributeExpressions(flowFile).getValue(); if (StringUtils.isNotBlank(propertyValue)) { - dimensions.add(new Dimension().withName(propertyName).withValue(propertyValue)); + dimensions.add(Dimension.builder().name(propertyName).value(propertyValue).build()); } } - datum.withDimensions(dimensions); + datumBuilder.dimensions(dimensions); } - final PutMetricDataRequest metricDataRequest = new PutMetricDataRequest() - .withNamespace(context.getProperty(NAMESPACE).evaluateAttributeExpressions(flowFile).getValue()) - .withMetricData(datum); + final PutMetricDataRequest metricDataRequest = PutMetricDataRequest.builder() + .namespace(context.getProperty(NAMESPACE).evaluateAttributeExpressions(flowFile).getValue()) + .metricData(datumBuilder.build()) + .build(); putMetricData(context, metricDataRequest); session.transfer(flowFile, REL_SUCCESS); - getLogger().info("Successfully published cloudwatch metric for {}", new Object[]{flowFile}); + getLogger().info("Successfully published cloudwatch metric for {}", flowFile); } catch (final Exception e) { - getLogger().error("Failed to publish cloudwatch metric for {} due to {}", new Object[]{flowFile, e}); + getLogger().error("Failed to publish cloudwatch metric for {} due to {}", flowFile, e); flowFile = session.penalize(flowFile); session.transfer(flowFile, REL_FAILURE); } } - protected PutMetricDataResult putMetricData(ProcessContext context, PutMetricDataRequest metricDataRequest) throws AmazonClientException { - final AmazonCloudWatchClient client = getClient(context); - final PutMetricDataResult result = client.putMetricData(metricDataRequest); + protected PutMetricDataResponse putMetricData(final ProcessContext context, final PutMetricDataRequest metricDataRequest) throws AmazonClientException { + final CloudWatchClient client = getClient(context); + final PutMetricDataResponse result = client.putMetricData(metricDataRequest); return result; } diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/cloudwatch/MockPutCloudWatchMetric.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/cloudwatch/MockPutCloudWatchMetric.java index 84102d0d38..b5aae1f10c 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/cloudwatch/MockPutCloudWatchMetric.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/cloudwatch/MockPutCloudWatchMetric.java @@ -17,12 +17,12 @@ package org.apache.nifi.processors.aws.cloudwatch; import com.amazonaws.AmazonClientException; -import com.amazonaws.services.cloudwatch.model.MetricDatum; -import com.amazonaws.services.cloudwatch.model.PutMetricDataRequest; -import com.amazonaws.services.cloudwatch.model.PutMetricDataResult; +import org.apache.nifi.processor.ProcessContext; +import software.amazon.awssdk.services.cloudwatch.model.MetricDatum; +import software.amazon.awssdk.services.cloudwatch.model.PutMetricDataRequest; +import software.amazon.awssdk.services.cloudwatch.model.PutMetricDataResponse; import java.util.List; -import org.apache.nifi.processor.ProcessContext; /** @@ -33,14 +33,14 @@ public class MockPutCloudWatchMetric extends PutCloudWatchMetric { protected String actualNamespace; protected List actualMetricData; protected AmazonClientException throwException; - protected PutMetricDataResult result = new PutMetricDataResult(); + protected PutMetricDataResponse result = PutMetricDataResponse.builder().build(); protected int putMetricDataCallCount = 0; - protected PutMetricDataResult putMetricData(ProcessContext context, PutMetricDataRequest metricDataRequest) throws AmazonClientException { + protected PutMetricDataResponse putMetricData(final ProcessContext context, final PutMetricDataRequest metricDataRequest) throws AmazonClientException { putMetricDataCallCount++; - actualNamespace = metricDataRequest.getNamespace(); - actualMetricData = metricDataRequest.getMetricData(); + actualNamespace = metricDataRequest.namespace(); + actualMetricData = metricDataRequest.metricData(); if (throwException != null) { throw throwException; diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/cloudwatch/TestPutCloudWatchMetric.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/cloudwatch/TestPutCloudWatchMetric.java index a0cfcd518c..8574c91490 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/cloudwatch/TestPutCloudWatchMetric.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/cloudwatch/TestPutCloudWatchMetric.java @@ -16,24 +16,26 @@ */ package org.apache.nifi.processors.aws.cloudwatch; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.HashMap; -import java.util.stream.Stream; - import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; - -import com.amazonaws.services.cloudwatch.model.Dimension; -import com.amazonaws.services.cloudwatch.model.MetricDatum; import org.junit.jupiter.api.Test; -import static org.junit.jupiter.api.Assertions.assertEquals; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.CsvSource; import org.junit.jupiter.params.provider.MethodSource; +import software.amazon.awssdk.services.cloudwatch.model.Dimension; +import software.amazon.awssdk.services.cloudwatch.model.MetricDatum; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; /** * Unit tests for {@link PutCloudWatchMetric}. @@ -59,8 +61,8 @@ public class TestPutCloudWatchMetric { assertEquals(1, mockPutCloudWatchMetric.putMetricDataCallCount); assertEquals("TestNamespace", mockPutCloudWatchMetric.actualNamespace); MetricDatum datum = mockPutCloudWatchMetric.actualMetricData.get(0); - assertEquals("TestMetric", datum.getMetricName()); - assertEquals(1d, datum.getValue(), 0.0001d); + assertEquals("TestMetric", datum.metricName()); + assertEquals(1d, datum.value(), 0.0001d); } @Test @@ -150,8 +152,8 @@ public class TestPutCloudWatchMetric { assertEquals(1, mockPutCloudWatchMetric.putMetricDataCallCount); assertEquals("TestNamespace", mockPutCloudWatchMetric.actualNamespace); MetricDatum datum = mockPutCloudWatchMetric.actualMetricData.get(0); - assertEquals("TestMetric", datum.getMetricName()); - assertEquals(1.23d, datum.getValue(), 0.0001d); + assertEquals("TestMetric", datum.metricName()); + assertEquals(1.23d, datum.value(), 0.0001d); } @Test @@ -179,11 +181,11 @@ public class TestPutCloudWatchMetric { assertEquals(1, mockPutCloudWatchMetric.putMetricDataCallCount); assertEquals("TestNamespace", mockPutCloudWatchMetric.actualNamespace); MetricDatum datum = mockPutCloudWatchMetric.actualMetricData.get(0); - assertEquals("TestMetric", datum.getMetricName()); - assertEquals(1.0d, datum.getStatisticValues().getMinimum(), 0.0001d); - assertEquals(2.0d, datum.getStatisticValues().getMaximum(), 0.0001d); - assertEquals(3.0d, datum.getStatisticValues().getSum(), 0.0001d); - assertEquals(2.0d, datum.getStatisticValues().getSampleCount(), 0.0001d); + assertEquals("TestMetric", datum.metricName()); + assertEquals(1.0d, datum.statisticValues().minimum(), 0.0001d); + assertEquals(2.0d, datum.statisticValues().maximum(), 0.0001d); + assertEquals(3.0d, datum.statisticValues().sum(), 0.0001d); + assertEquals(2.0d, datum.statisticValues().sampleCount(), 0.0001d); } @Test @@ -209,16 +211,16 @@ public class TestPutCloudWatchMetric { assertEquals(1, mockPutCloudWatchMetric.putMetricDataCallCount); assertEquals("TestNamespace", mockPutCloudWatchMetric.actualNamespace); MetricDatum datum = mockPutCloudWatchMetric.actualMetricData.get(0); - assertEquals("TestMetric", datum.getMetricName()); - assertEquals(1d, datum.getValue(), 0.0001d); + assertEquals("TestMetric", datum.metricName()); + assertEquals(1d, datum.value(), 0.0001d); - List dimensions = datum.getDimensions(); - Collections.sort(dimensions, (d1, d2) -> d1.getName().compareTo(d2.getName())); + List dimensions = new ArrayList<>(datum.dimensions()); + Collections.sort(dimensions, Comparator.comparing(Dimension::name)); assertEquals(2, dimensions.size()); - assertEquals("dim1", dimensions.get(0).getName()); - assertEquals("1", dimensions.get(0).getValue()); - assertEquals("dim2", dimensions.get(1).getName()); - assertEquals("val2", dimensions.get(1).getValue()); + assertEquals("dim1", dimensions.get(0).name()); + assertEquals("1", dimensions.get(0).value()); + assertEquals("dim2", dimensions.get(1).name()); + assertEquals("val2", dimensions.get(1).value()); } @Test