NIFI-12189: Upgrading PutCloudwatchMetric to use AWS SDK 2.x

This closes #7857

Signed-off-by: Chris Sampson <chris.sampson82@gmail.com>
This commit is contained in:
Joe Gresock 2023-10-08 06:30:22 -04:00 committed by Chris Sampson
parent d2fb81d1d6
commit a584251f0a
No known key found for this signature in database
GPG Key ID: 546AEB0826587237
4 changed files with 91 additions and 106 deletions

View File

@ -66,10 +66,6 @@
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-ssl-context-service-api</artifactId> <artifactId>nifi-ssl-context-service-api</artifactId>
</dependency> </dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-cloudwatchmetrics</artifactId>
</dependency>
<dependency> <dependency>
<groupId>com.amazonaws</groupId> <groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-sts</artifactId> <artifactId>aws-java-sdk-sts</artifactId>
@ -78,6 +74,10 @@
<groupId>software.amazon.awssdk</groupId> <groupId>software.amazon.awssdk</groupId>
<artifactId>sts</artifactId> <artifactId>sts</artifactId>
</dependency> </dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>cloudwatch</artifactId>
</dependency>
<dependency> <dependency>
<groupId>commons-beanutils</groupId> <groupId>commons-beanutils</groupId>
<artifactId>commons-beanutils</artifactId> <artifactId>commons-beanutils</artifactId>

View File

@ -16,16 +16,7 @@
*/ */
package org.apache.nifi.processors.aws.cloudwatch; package org.apache.nifi.processors.aws.cloudwatch;
import java.util.ArrayList; import com.amazonaws.AmazonClientException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement;
@ -45,19 +36,25 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor; import org.apache.nifi.processors.aws.v2.AbstractAwsSyncProcessor;
import software.amazon.awssdk.services.cloudwatch.CloudWatchClient;
import software.amazon.awssdk.services.cloudwatch.CloudWatchClientBuilder;
import software.amazon.awssdk.services.cloudwatch.model.Dimension;
import software.amazon.awssdk.services.cloudwatch.model.MetricDatum;
import software.amazon.awssdk.services.cloudwatch.model.PutMetricDataRequest;
import software.amazon.awssdk.services.cloudwatch.model.PutMetricDataResponse;
import software.amazon.awssdk.services.cloudwatch.model.StandardUnit;
import software.amazon.awssdk.services.cloudwatch.model.StatisticSet;
import com.amazonaws.AmazonClientException; import java.util.ArrayList;
import com.amazonaws.ClientConfiguration; import java.util.Arrays;
import com.amazonaws.auth.AWSCredentials; import java.util.Collection;
import com.amazonaws.auth.AWSCredentialsProvider; import java.util.Collections;
import com.amazonaws.services.cloudwatch.AmazonCloudWatchClient; import java.util.Date;
import com.amazonaws.services.cloudwatch.model.Dimension; import java.util.HashSet;
import com.amazonaws.services.cloudwatch.model.MetricDatum; import java.util.List;
import com.amazonaws.services.cloudwatch.model.PutMetricDataRequest; import java.util.Set;
import com.amazonaws.services.cloudwatch.model.PutMetricDataResult; import java.util.stream.Collectors;
import com.amazonaws.services.cloudwatch.model.StatisticSet;
import com.amazonaws.services.cloudwatch.model.StandardUnit;
@SupportsBatching @SupportsBatching
@InputRequirement(Requirement.INPUT_REQUIRED) @InputRequirement(Requirement.INPUT_REQUIRED)
@ -67,7 +64,7 @@ import com.amazonaws.services.cloudwatch.model.StandardUnit;
description = "Allows dimension name/value pairs to be added to the metric. AWS supports a maximum of 10 dimensions.", description = "Allows dimension name/value pairs to be added to the metric. AWS supports a maximum of 10 dimensions.",
expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
@Tags({"amazon", "aws", "cloudwatch", "metrics", "put", "publish"}) @Tags({"amazon", "aws", "cloudwatch", "metrics", "put", "publish"})
public class PutCloudWatchMetric extends AbstractAWSCredentialsProviderProcessor<AmazonCloudWatchClient> { public class PutCloudWatchMetric extends AbstractAwsSyncProcessor<CloudWatchClient, CloudWatchClientBuilder> {
public static final Set<Relationship> relationships = Collections.unmodifiableSet( public static final Set<Relationship> relationships = Collections.unmodifiableSet(
new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE))); new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)));
@ -200,6 +197,11 @@ public class PutCloudWatchMetric extends AbstractAWSCredentialsProviderProcessor
private volatile Set<String> dynamicPropertyNames = new HashSet<>(); private volatile Set<String> dynamicPropertyNames = new HashSet<>();
@Override
protected CloudWatchClientBuilder createClientBuilder(final ProcessContext context) {
return CloudWatchClient.builder();
}
@Override @Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties; return properties;
@ -262,90 +264,71 @@ public class PutCloudWatchMetric extends AbstractAWSCredentialsProviderProcessor
return problems; return problems;
} }
/**
* Create client using aws credentials provider. This is the preferred way for creating clients
*/
@Override @Override
protected AmazonCloudWatchClient createClient(ProcessContext processContext, AWSCredentialsProvider awsCredentialsProvider, ClientConfiguration clientConfiguration) { public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
getLogger().info("Creating client using aws credentials provider");
return new AmazonCloudWatchClient(awsCredentialsProvider, clientConfiguration);
}
/**
* Create client using AWSCredentials
*
* @deprecated use {@link #createClient(ProcessContext, AWSCredentialsProvider, ClientConfiguration)} instead
*/
@Override
@Deprecated
protected AmazonCloudWatchClient createClient(ProcessContext processContext, AWSCredentials awsCredentials, ClientConfiguration clientConfiguration) {
getLogger().debug("Creating client with aws credentials");
return new AmazonCloudWatchClient(awsCredentials, clientConfiguration);
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get(); FlowFile flowFile = session.get();
if (flowFile == null) { if (flowFile == null) {
return; return;
} }
MetricDatum datum = new MetricDatum(); final MetricDatum.Builder datumBuilder = MetricDatum.builder();
try { try {
datum.setMetricName(context.getProperty(METRIC_NAME).evaluateAttributeExpressions(flowFile).getValue()); datumBuilder.metricName(context.getProperty(METRIC_NAME).evaluateAttributeExpressions(flowFile).getValue());
final String valueString = context.getProperty(VALUE).evaluateAttributeExpressions(flowFile).getValue(); final String valueString = context.getProperty(VALUE).evaluateAttributeExpressions(flowFile).getValue();
if (valueString != null) { if (valueString != null) {
datum.setValue(Double.parseDouble(valueString)); datumBuilder.value(Double.parseDouble(valueString));
} else { } else {
StatisticSet statisticSet = new StatisticSet(); final StatisticSet statisticSet = StatisticSet.builder()
statisticSet.setMaximum(Double.parseDouble(context.getProperty(MAXIMUM).evaluateAttributeExpressions(flowFile).getValue())); .maximum(Double.parseDouble(context.getProperty(MAXIMUM).evaluateAttributeExpressions(flowFile).getValue()))
statisticSet.setMinimum(Double.parseDouble(context.getProperty(MINIMUM).evaluateAttributeExpressions(flowFile).getValue())); .minimum(Double.parseDouble(context.getProperty(MINIMUM).evaluateAttributeExpressions(flowFile).getValue()))
statisticSet.setSampleCount(Double.parseDouble(context.getProperty(SAMPLECOUNT).evaluateAttributeExpressions(flowFile).getValue())); .sampleCount(Double.parseDouble(context.getProperty(SAMPLECOUNT).evaluateAttributeExpressions(flowFile).getValue()))
statisticSet.setSum(Double.parseDouble(context.getProperty(SUM).evaluateAttributeExpressions(flowFile).getValue())); .sum(Double.parseDouble(context.getProperty(SUM).evaluateAttributeExpressions(flowFile).getValue()))
.build();
datum.setStatisticValues(statisticSet); datumBuilder.statisticValues(statisticSet);
} }
final String timestamp = context.getProperty(TIMESTAMP).evaluateAttributeExpressions(flowFile).getValue(); final String timestamp = context.getProperty(TIMESTAMP).evaluateAttributeExpressions(flowFile).getValue();
if (timestamp != null) { if (timestamp != null) {
datum.setTimestamp(new Date(Long.parseLong(timestamp))); datumBuilder.timestamp(new Date(Long.parseLong(timestamp)).toInstant());
} }
final String unit = context.getProperty(UNIT).evaluateAttributeExpressions(flowFile).getValue(); final String unit = context.getProperty(UNIT).evaluateAttributeExpressions(flowFile).getValue();
if (unit != null) { if (unit != null) {
datum.setUnit(unit); datumBuilder.unit(unit);
} }
// add dynamic properties as dimensions // add dynamic properties as dimensions
if (!dynamicPropertyNames.isEmpty()) { if (!dynamicPropertyNames.isEmpty()) {
final List<Dimension> dimensions = new ArrayList<>(dynamicPropertyNames.size()); final List<Dimension> dimensions = new ArrayList<>(dynamicPropertyNames.size());
for (String propertyName : dynamicPropertyNames) { for (final String propertyName : dynamicPropertyNames) {
final String propertyValue = context.getProperty(propertyName).evaluateAttributeExpressions(flowFile).getValue(); final String propertyValue = context.getProperty(propertyName).evaluateAttributeExpressions(flowFile).getValue();
if (StringUtils.isNotBlank(propertyValue)) { if (StringUtils.isNotBlank(propertyValue)) {
dimensions.add(new Dimension().withName(propertyName).withValue(propertyValue)); dimensions.add(Dimension.builder().name(propertyName).value(propertyValue).build());
} }
} }
datum.withDimensions(dimensions); datumBuilder.dimensions(dimensions);
} }
final PutMetricDataRequest metricDataRequest = new PutMetricDataRequest() final PutMetricDataRequest metricDataRequest = PutMetricDataRequest.builder()
.withNamespace(context.getProperty(NAMESPACE).evaluateAttributeExpressions(flowFile).getValue()) .namespace(context.getProperty(NAMESPACE).evaluateAttributeExpressions(flowFile).getValue())
.withMetricData(datum); .metricData(datumBuilder.build())
.build();
putMetricData(context, metricDataRequest); putMetricData(context, metricDataRequest);
session.transfer(flowFile, REL_SUCCESS); session.transfer(flowFile, REL_SUCCESS);
getLogger().info("Successfully published cloudwatch metric for {}", new Object[]{flowFile}); getLogger().info("Successfully published cloudwatch metric for {}", flowFile);
} catch (final Exception e) { } catch (final Exception e) {
getLogger().error("Failed to publish cloudwatch metric for {} due to {}", new Object[]{flowFile, e}); getLogger().error("Failed to publish cloudwatch metric for {} due to {}", flowFile, e);
flowFile = session.penalize(flowFile); flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE); session.transfer(flowFile, REL_FAILURE);
} }
} }
protected PutMetricDataResult putMetricData(ProcessContext context, PutMetricDataRequest metricDataRequest) throws AmazonClientException { protected PutMetricDataResponse putMetricData(final ProcessContext context, final PutMetricDataRequest metricDataRequest) throws AmazonClientException {
final AmazonCloudWatchClient client = getClient(context); final CloudWatchClient client = getClient(context);
final PutMetricDataResult result = client.putMetricData(metricDataRequest); final PutMetricDataResponse result = client.putMetricData(metricDataRequest);
return result; return result;
} }

View File

@ -17,12 +17,12 @@
package org.apache.nifi.processors.aws.cloudwatch; package org.apache.nifi.processors.aws.cloudwatch;
import com.amazonaws.AmazonClientException; import com.amazonaws.AmazonClientException;
import com.amazonaws.services.cloudwatch.model.MetricDatum; import org.apache.nifi.processor.ProcessContext;
import com.amazonaws.services.cloudwatch.model.PutMetricDataRequest; import software.amazon.awssdk.services.cloudwatch.model.MetricDatum;
import com.amazonaws.services.cloudwatch.model.PutMetricDataResult; import software.amazon.awssdk.services.cloudwatch.model.PutMetricDataRequest;
import software.amazon.awssdk.services.cloudwatch.model.PutMetricDataResponse;
import java.util.List; import java.util.List;
import org.apache.nifi.processor.ProcessContext;
/** /**
@ -33,14 +33,14 @@ public class MockPutCloudWatchMetric extends PutCloudWatchMetric {
protected String actualNamespace; protected String actualNamespace;
protected List<MetricDatum> actualMetricData; protected List<MetricDatum> actualMetricData;
protected AmazonClientException throwException; protected AmazonClientException throwException;
protected PutMetricDataResult result = new PutMetricDataResult(); protected PutMetricDataResponse result = PutMetricDataResponse.builder().build();
protected int putMetricDataCallCount = 0; protected int putMetricDataCallCount = 0;
protected PutMetricDataResult putMetricData(ProcessContext context, PutMetricDataRequest metricDataRequest) throws AmazonClientException { protected PutMetricDataResponse putMetricData(final ProcessContext context, final PutMetricDataRequest metricDataRequest) throws AmazonClientException {
putMetricDataCallCount++; putMetricDataCallCount++;
actualNamespace = metricDataRequest.getNamespace(); actualNamespace = metricDataRequest.namespace();
actualMetricData = metricDataRequest.getMetricData(); actualMetricData = metricDataRequest.metricData();
if (throwException != null) { if (throwException != null) {
throw throwException; throw throwException;

View File

@ -16,24 +16,26 @@
*/ */
package org.apache.nifi.processors.aws.cloudwatch; package org.apache.nifi.processors.aws.cloudwatch;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
import java.util.stream.Stream;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.TestRunners;
import com.amazonaws.services.cloudwatch.model.Dimension;
import com.amazonaws.services.cloudwatch.model.MetricDatum;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.CsvSource; import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.MethodSource;
import software.amazon.awssdk.services.cloudwatch.model.Dimension;
import software.amazon.awssdk.services.cloudwatch.model.MetricDatum;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertEquals;
/** /**
* Unit tests for {@link PutCloudWatchMetric}. * Unit tests for {@link PutCloudWatchMetric}.
@ -59,8 +61,8 @@ public class TestPutCloudWatchMetric {
assertEquals(1, mockPutCloudWatchMetric.putMetricDataCallCount); assertEquals(1, mockPutCloudWatchMetric.putMetricDataCallCount);
assertEquals("TestNamespace", mockPutCloudWatchMetric.actualNamespace); assertEquals("TestNamespace", mockPutCloudWatchMetric.actualNamespace);
MetricDatum datum = mockPutCloudWatchMetric.actualMetricData.get(0); MetricDatum datum = mockPutCloudWatchMetric.actualMetricData.get(0);
assertEquals("TestMetric", datum.getMetricName()); assertEquals("TestMetric", datum.metricName());
assertEquals(1d, datum.getValue(), 0.0001d); assertEquals(1d, datum.value(), 0.0001d);
} }
@Test @Test
@ -150,8 +152,8 @@ public class TestPutCloudWatchMetric {
assertEquals(1, mockPutCloudWatchMetric.putMetricDataCallCount); assertEquals(1, mockPutCloudWatchMetric.putMetricDataCallCount);
assertEquals("TestNamespace", mockPutCloudWatchMetric.actualNamespace); assertEquals("TestNamespace", mockPutCloudWatchMetric.actualNamespace);
MetricDatum datum = mockPutCloudWatchMetric.actualMetricData.get(0); MetricDatum datum = mockPutCloudWatchMetric.actualMetricData.get(0);
assertEquals("TestMetric", datum.getMetricName()); assertEquals("TestMetric", datum.metricName());
assertEquals(1.23d, datum.getValue(), 0.0001d); assertEquals(1.23d, datum.value(), 0.0001d);
} }
@Test @Test
@ -179,11 +181,11 @@ public class TestPutCloudWatchMetric {
assertEquals(1, mockPutCloudWatchMetric.putMetricDataCallCount); assertEquals(1, mockPutCloudWatchMetric.putMetricDataCallCount);
assertEquals("TestNamespace", mockPutCloudWatchMetric.actualNamespace); assertEquals("TestNamespace", mockPutCloudWatchMetric.actualNamespace);
MetricDatum datum = mockPutCloudWatchMetric.actualMetricData.get(0); MetricDatum datum = mockPutCloudWatchMetric.actualMetricData.get(0);
assertEquals("TestMetric", datum.getMetricName()); assertEquals("TestMetric", datum.metricName());
assertEquals(1.0d, datum.getStatisticValues().getMinimum(), 0.0001d); assertEquals(1.0d, datum.statisticValues().minimum(), 0.0001d);
assertEquals(2.0d, datum.getStatisticValues().getMaximum(), 0.0001d); assertEquals(2.0d, datum.statisticValues().maximum(), 0.0001d);
assertEquals(3.0d, datum.getStatisticValues().getSum(), 0.0001d); assertEquals(3.0d, datum.statisticValues().sum(), 0.0001d);
assertEquals(2.0d, datum.getStatisticValues().getSampleCount(), 0.0001d); assertEquals(2.0d, datum.statisticValues().sampleCount(), 0.0001d);
} }
@Test @Test
@ -209,16 +211,16 @@ public class TestPutCloudWatchMetric {
assertEquals(1, mockPutCloudWatchMetric.putMetricDataCallCount); assertEquals(1, mockPutCloudWatchMetric.putMetricDataCallCount);
assertEquals("TestNamespace", mockPutCloudWatchMetric.actualNamespace); assertEquals("TestNamespace", mockPutCloudWatchMetric.actualNamespace);
MetricDatum datum = mockPutCloudWatchMetric.actualMetricData.get(0); MetricDatum datum = mockPutCloudWatchMetric.actualMetricData.get(0);
assertEquals("TestMetric", datum.getMetricName()); assertEquals("TestMetric", datum.metricName());
assertEquals(1d, datum.getValue(), 0.0001d); assertEquals(1d, datum.value(), 0.0001d);
List<Dimension> dimensions = datum.getDimensions(); List<Dimension> dimensions = new ArrayList<>(datum.dimensions());
Collections.sort(dimensions, (d1, d2) -> d1.getName().compareTo(d2.getName())); Collections.sort(dimensions, Comparator.comparing(Dimension::name));
assertEquals(2, dimensions.size()); assertEquals(2, dimensions.size());
assertEquals("dim1", dimensions.get(0).getName()); assertEquals("dim1", dimensions.get(0).name());
assertEquals("1", dimensions.get(0).getValue()); assertEquals("1", dimensions.get(0).value());
assertEquals("dim2", dimensions.get(1).getName()); assertEquals("dim2", dimensions.get(1).name());
assertEquals("val2", dimensions.get(1).getValue()); assertEquals("val2", dimensions.get(1).value());
} }
@Test @Test