diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml
index 6b05a42cc4..9cd6ef33f0 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml
@@ -66,10 +66,6 @@
org.apache.nifi
nifi-ssl-context-service-api
-
- com.amazonaws
- aws-java-sdk-cloudwatchmetrics
-
com.amazonaws
aws-java-sdk-sts
@@ -78,6 +74,10 @@
software.amazon.awssdk
sts
+
+ software.amazon.awssdk
+ cloudwatch
+
commons-beanutils
commons-beanutils
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/cloudwatch/PutCloudWatchMetric.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/cloudwatch/PutCloudWatchMetric.java
index 678fadfaa0..2e4a702e07 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/cloudwatch/PutCloudWatchMetric.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/cloudwatch/PutCloudWatchMetric.java
@@ -16,16 +16,7 @@
*/
package org.apache.nifi.processors.aws.cloudwatch;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.stream.Collectors;
-
+import com.amazonaws.AmazonClientException;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
@@ -45,19 +36,25 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
+import org.apache.nifi.processors.aws.v2.AbstractAwsSyncProcessor;
+import software.amazon.awssdk.services.cloudwatch.CloudWatchClient;
+import software.amazon.awssdk.services.cloudwatch.CloudWatchClientBuilder;
+import software.amazon.awssdk.services.cloudwatch.model.Dimension;
+import software.amazon.awssdk.services.cloudwatch.model.MetricDatum;
+import software.amazon.awssdk.services.cloudwatch.model.PutMetricDataRequest;
+import software.amazon.awssdk.services.cloudwatch.model.PutMetricDataResponse;
+import software.amazon.awssdk.services.cloudwatch.model.StandardUnit;
+import software.amazon.awssdk.services.cloudwatch.model.StatisticSet;
-import com.amazonaws.AmazonClientException;
-import com.amazonaws.ClientConfiguration;
-import com.amazonaws.auth.AWSCredentials;
-import com.amazonaws.auth.AWSCredentialsProvider;
-import com.amazonaws.services.cloudwatch.AmazonCloudWatchClient;
-import com.amazonaws.services.cloudwatch.model.Dimension;
-import com.amazonaws.services.cloudwatch.model.MetricDatum;
-import com.amazonaws.services.cloudwatch.model.PutMetricDataRequest;
-import com.amazonaws.services.cloudwatch.model.PutMetricDataResult;
-import com.amazonaws.services.cloudwatch.model.StatisticSet;
-import com.amazonaws.services.cloudwatch.model.StandardUnit;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
@SupportsBatching
@InputRequirement(Requirement.INPUT_REQUIRED)
@@ -67,7 +64,7 @@ import com.amazonaws.services.cloudwatch.model.StandardUnit;
description = "Allows dimension name/value pairs to be added to the metric. AWS supports a maximum of 10 dimensions.",
expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
@Tags({"amazon", "aws", "cloudwatch", "metrics", "put", "publish"})
-public class PutCloudWatchMetric extends AbstractAWSCredentialsProviderProcessor {
+public class PutCloudWatchMetric extends AbstractAwsSyncProcessor {
public static final Set relationships = Collections.unmodifiableSet(
new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)));
@@ -200,6 +197,11 @@ public class PutCloudWatchMetric extends AbstractAWSCredentialsProviderProcessor
private volatile Set dynamicPropertyNames = new HashSet<>();
+ @Override
+ protected CloudWatchClientBuilder createClientBuilder(final ProcessContext context) {
+ return CloudWatchClient.builder();
+ }
+
@Override
protected List getSupportedPropertyDescriptors() {
return properties;
@@ -262,90 +264,71 @@ public class PutCloudWatchMetric extends AbstractAWSCredentialsProviderProcessor
return problems;
}
- /**
- * Create client using aws credentials provider. This is the preferred way for creating clients
- */
@Override
- protected AmazonCloudWatchClient createClient(ProcessContext processContext, AWSCredentialsProvider awsCredentialsProvider, ClientConfiguration clientConfiguration) {
- getLogger().info("Creating client using aws credentials provider");
- return new AmazonCloudWatchClient(awsCredentialsProvider, clientConfiguration);
- }
-
- /**
- * Create client using AWSCredentials
- *
- * @deprecated use {@link #createClient(ProcessContext, AWSCredentialsProvider, ClientConfiguration)} instead
- */
- @Override
- @Deprecated
- protected AmazonCloudWatchClient createClient(ProcessContext processContext, AWSCredentials awsCredentials, ClientConfiguration clientConfiguration) {
- getLogger().debug("Creating client with aws credentials");
- return new AmazonCloudWatchClient(awsCredentials, clientConfiguration);
- }
-
- @Override
- public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+ public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
- MetricDatum datum = new MetricDatum();
+ final MetricDatum.Builder datumBuilder = MetricDatum.builder();
try {
- datum.setMetricName(context.getProperty(METRIC_NAME).evaluateAttributeExpressions(flowFile).getValue());
+ datumBuilder.metricName(context.getProperty(METRIC_NAME).evaluateAttributeExpressions(flowFile).getValue());
final String valueString = context.getProperty(VALUE).evaluateAttributeExpressions(flowFile).getValue();
if (valueString != null) {
- datum.setValue(Double.parseDouble(valueString));
+ datumBuilder.value(Double.parseDouble(valueString));
} else {
- StatisticSet statisticSet = new StatisticSet();
- statisticSet.setMaximum(Double.parseDouble(context.getProperty(MAXIMUM).evaluateAttributeExpressions(flowFile).getValue()));
- statisticSet.setMinimum(Double.parseDouble(context.getProperty(MINIMUM).evaluateAttributeExpressions(flowFile).getValue()));
- statisticSet.setSampleCount(Double.parseDouble(context.getProperty(SAMPLECOUNT).evaluateAttributeExpressions(flowFile).getValue()));
- statisticSet.setSum(Double.parseDouble(context.getProperty(SUM).evaluateAttributeExpressions(flowFile).getValue()));
+ final StatisticSet statisticSet = StatisticSet.builder()
+ .maximum(Double.parseDouble(context.getProperty(MAXIMUM).evaluateAttributeExpressions(flowFile).getValue()))
+ .minimum(Double.parseDouble(context.getProperty(MINIMUM).evaluateAttributeExpressions(flowFile).getValue()))
+ .sampleCount(Double.parseDouble(context.getProperty(SAMPLECOUNT).evaluateAttributeExpressions(flowFile).getValue()))
+ .sum(Double.parseDouble(context.getProperty(SUM).evaluateAttributeExpressions(flowFile).getValue()))
+ .build();
- datum.setStatisticValues(statisticSet);
+ datumBuilder.statisticValues(statisticSet);
}
final String timestamp = context.getProperty(TIMESTAMP).evaluateAttributeExpressions(flowFile).getValue();
if (timestamp != null) {
- datum.setTimestamp(new Date(Long.parseLong(timestamp)));
+ datumBuilder.timestamp(new Date(Long.parseLong(timestamp)).toInstant());
}
final String unit = context.getProperty(UNIT).evaluateAttributeExpressions(flowFile).getValue();
if (unit != null) {
- datum.setUnit(unit);
+ datumBuilder.unit(unit);
}
// add dynamic properties as dimensions
if (!dynamicPropertyNames.isEmpty()) {
final List dimensions = new ArrayList<>(dynamicPropertyNames.size());
- for (String propertyName : dynamicPropertyNames) {
+ for (final String propertyName : dynamicPropertyNames) {
final String propertyValue = context.getProperty(propertyName).evaluateAttributeExpressions(flowFile).getValue();
if (StringUtils.isNotBlank(propertyValue)) {
- dimensions.add(new Dimension().withName(propertyName).withValue(propertyValue));
+ dimensions.add(Dimension.builder().name(propertyName).value(propertyValue).build());
}
}
- datum.withDimensions(dimensions);
+ datumBuilder.dimensions(dimensions);
}
- final PutMetricDataRequest metricDataRequest = new PutMetricDataRequest()
- .withNamespace(context.getProperty(NAMESPACE).evaluateAttributeExpressions(flowFile).getValue())
- .withMetricData(datum);
+ final PutMetricDataRequest metricDataRequest = PutMetricDataRequest.builder()
+ .namespace(context.getProperty(NAMESPACE).evaluateAttributeExpressions(flowFile).getValue())
+ .metricData(datumBuilder.build())
+ .build();
putMetricData(context, metricDataRequest);
session.transfer(flowFile, REL_SUCCESS);
- getLogger().info("Successfully published cloudwatch metric for {}", new Object[]{flowFile});
+ getLogger().info("Successfully published cloudwatch metric for {}", flowFile);
} catch (final Exception e) {
- getLogger().error("Failed to publish cloudwatch metric for {} due to {}", new Object[]{flowFile, e});
+ getLogger().error("Failed to publish cloudwatch metric for {} due to {}", flowFile, e);
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
}
}
- protected PutMetricDataResult putMetricData(ProcessContext context, PutMetricDataRequest metricDataRequest) throws AmazonClientException {
- final AmazonCloudWatchClient client = getClient(context);
- final PutMetricDataResult result = client.putMetricData(metricDataRequest);
+ protected PutMetricDataResponse putMetricData(final ProcessContext context, final PutMetricDataRequest metricDataRequest) throws AmazonClientException {
+ final CloudWatchClient client = getClient(context);
+ final PutMetricDataResponse result = client.putMetricData(metricDataRequest);
return result;
}
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/cloudwatch/MockPutCloudWatchMetric.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/cloudwatch/MockPutCloudWatchMetric.java
index 84102d0d38..b5aae1f10c 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/cloudwatch/MockPutCloudWatchMetric.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/cloudwatch/MockPutCloudWatchMetric.java
@@ -17,12 +17,12 @@
package org.apache.nifi.processors.aws.cloudwatch;
import com.amazonaws.AmazonClientException;
-import com.amazonaws.services.cloudwatch.model.MetricDatum;
-import com.amazonaws.services.cloudwatch.model.PutMetricDataRequest;
-import com.amazonaws.services.cloudwatch.model.PutMetricDataResult;
+import org.apache.nifi.processor.ProcessContext;
+import software.amazon.awssdk.services.cloudwatch.model.MetricDatum;
+import software.amazon.awssdk.services.cloudwatch.model.PutMetricDataRequest;
+import software.amazon.awssdk.services.cloudwatch.model.PutMetricDataResponse;
import java.util.List;
-import org.apache.nifi.processor.ProcessContext;
/**
@@ -33,14 +33,14 @@ public class MockPutCloudWatchMetric extends PutCloudWatchMetric {
protected String actualNamespace;
protected List actualMetricData;
protected AmazonClientException throwException;
- protected PutMetricDataResult result = new PutMetricDataResult();
+ protected PutMetricDataResponse result = PutMetricDataResponse.builder().build();
protected int putMetricDataCallCount = 0;
- protected PutMetricDataResult putMetricData(ProcessContext context, PutMetricDataRequest metricDataRequest) throws AmazonClientException {
+ protected PutMetricDataResponse putMetricData(final ProcessContext context, final PutMetricDataRequest metricDataRequest) throws AmazonClientException {
putMetricDataCallCount++;
- actualNamespace = metricDataRequest.getNamespace();
- actualMetricData = metricDataRequest.getMetricData();
+ actualNamespace = metricDataRequest.namespace();
+ actualMetricData = metricDataRequest.metricData();
if (throwException != null) {
throw throwException;
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/cloudwatch/TestPutCloudWatchMetric.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/cloudwatch/TestPutCloudWatchMetric.java
index a0cfcd518c..8574c91490 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/cloudwatch/TestPutCloudWatchMetric.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/cloudwatch/TestPutCloudWatchMetric.java
@@ -16,24 +16,26 @@
*/
package org.apache.nifi.processors.aws.cloudwatch;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.HashMap;
-import java.util.stream.Stream;
-
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
-
-import com.amazonaws.services.cloudwatch.model.Dimension;
-import com.amazonaws.services.cloudwatch.model.MetricDatum;
import org.junit.jupiter.api.Test;
-import static org.junit.jupiter.api.Assertions.assertEquals;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.MethodSource;
+import software.amazon.awssdk.services.cloudwatch.model.Dimension;
+import software.amazon.awssdk.services.cloudwatch.model.MetricDatum;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
/**
* Unit tests for {@link PutCloudWatchMetric}.
@@ -59,8 +61,8 @@ public class TestPutCloudWatchMetric {
assertEquals(1, mockPutCloudWatchMetric.putMetricDataCallCount);
assertEquals("TestNamespace", mockPutCloudWatchMetric.actualNamespace);
MetricDatum datum = mockPutCloudWatchMetric.actualMetricData.get(0);
- assertEquals("TestMetric", datum.getMetricName());
- assertEquals(1d, datum.getValue(), 0.0001d);
+ assertEquals("TestMetric", datum.metricName());
+ assertEquals(1d, datum.value(), 0.0001d);
}
@Test
@@ -150,8 +152,8 @@ public class TestPutCloudWatchMetric {
assertEquals(1, mockPutCloudWatchMetric.putMetricDataCallCount);
assertEquals("TestNamespace", mockPutCloudWatchMetric.actualNamespace);
MetricDatum datum = mockPutCloudWatchMetric.actualMetricData.get(0);
- assertEquals("TestMetric", datum.getMetricName());
- assertEquals(1.23d, datum.getValue(), 0.0001d);
+ assertEquals("TestMetric", datum.metricName());
+ assertEquals(1.23d, datum.value(), 0.0001d);
}
@Test
@@ -179,11 +181,11 @@ public class TestPutCloudWatchMetric {
assertEquals(1, mockPutCloudWatchMetric.putMetricDataCallCount);
assertEquals("TestNamespace", mockPutCloudWatchMetric.actualNamespace);
MetricDatum datum = mockPutCloudWatchMetric.actualMetricData.get(0);
- assertEquals("TestMetric", datum.getMetricName());
- assertEquals(1.0d, datum.getStatisticValues().getMinimum(), 0.0001d);
- assertEquals(2.0d, datum.getStatisticValues().getMaximum(), 0.0001d);
- assertEquals(3.0d, datum.getStatisticValues().getSum(), 0.0001d);
- assertEquals(2.0d, datum.getStatisticValues().getSampleCount(), 0.0001d);
+ assertEquals("TestMetric", datum.metricName());
+ assertEquals(1.0d, datum.statisticValues().minimum(), 0.0001d);
+ assertEquals(2.0d, datum.statisticValues().maximum(), 0.0001d);
+ assertEquals(3.0d, datum.statisticValues().sum(), 0.0001d);
+ assertEquals(2.0d, datum.statisticValues().sampleCount(), 0.0001d);
}
@Test
@@ -209,16 +211,16 @@ public class TestPutCloudWatchMetric {
assertEquals(1, mockPutCloudWatchMetric.putMetricDataCallCount);
assertEquals("TestNamespace", mockPutCloudWatchMetric.actualNamespace);
MetricDatum datum = mockPutCloudWatchMetric.actualMetricData.get(0);
- assertEquals("TestMetric", datum.getMetricName());
- assertEquals(1d, datum.getValue(), 0.0001d);
+ assertEquals("TestMetric", datum.metricName());
+ assertEquals(1d, datum.value(), 0.0001d);
- List dimensions = datum.getDimensions();
- Collections.sort(dimensions, (d1, d2) -> d1.getName().compareTo(d2.getName()));
+ List dimensions = new ArrayList<>(datum.dimensions());
+ Collections.sort(dimensions, Comparator.comparing(Dimension::name));
assertEquals(2, dimensions.size());
- assertEquals("dim1", dimensions.get(0).getName());
- assertEquals("1", dimensions.get(0).getValue());
- assertEquals("dim2", dimensions.get(1).getName());
- assertEquals("val2", dimensions.get(1).getValue());
+ assertEquals("dim1", dimensions.get(0).name());
+ assertEquals("1", dimensions.get(0).value());
+ assertEquals("dim2", dimensions.get(1).name());
+ assertEquals("val2", dimensions.get(1).value());
}
@Test