diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/cloudwatch/PutCloudWatchMetric.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/cloudwatch/PutCloudWatchMetric.java index 56f2d99f2b..533ca31e9a 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/cloudwatch/PutCloudWatchMetric.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/cloudwatch/PutCloudWatchMetric.java @@ -16,13 +16,17 @@ */ package org.apache.nifi.processors.aws.cloudwatch; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.Date; import java.util.HashSet; import java.util.List; import java.util.Set; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.SupportsBatching; @@ -32,6 +36,7 @@ import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.Validator; +import org.apache.nifi.expression.AttributeExpression; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; @@ -45,15 +50,20 @@ import com.amazonaws.ClientConfiguration; import com.amazonaws.auth.AWSCredentials; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.services.cloudwatch.AmazonCloudWatchClient; +import com.amazonaws.services.cloudwatch.model.Dimension; import com.amazonaws.services.cloudwatch.model.MetricDatum; import com.amazonaws.services.cloudwatch.model.PutMetricDataRequest; import com.amazonaws.services.cloudwatch.model.PutMetricDataResult; - +import com.amazonaws.services.cloudwatch.model.StatisticSet; @SupportsBatching @InputRequirement(Requirement.INPUT_REQUIRED) +@CapabilityDescription("Publishes metrics to Amazon CloudWatch. Metric can be either a single value, or a StatisticSet comprised of "+ + "minimum, maximum, sum and sample count.") +@DynamicProperty(name = "Dimension Name", value = "Dimension Value", + description = "Allows dimension name/value pairs to be added to the metric. AWS supports a maximum of 10 dimensions.", + supportsExpressionLanguage = true) @Tags({"amazon", "aws", "cloudwatch", "metrics", "put", "publish"}) -@CapabilityDescription("Publishes metrics to Amazon CloudWatch") public class PutCloudWatchMetric extends AbstractAWSCredentialsProviderProcessor { public static final Set relationships = Collections.unmodifiableSet( @@ -89,7 +99,7 @@ public class PutCloudWatchMetric extends AbstractAWSCredentialsProviderProcessor public static final PropertyDescriptor METRIC_NAME = new PropertyDescriptor.Builder() .name("MetricName") - .displayName("MetricName") + .displayName("Metric Name") .description("The name of the metric") .expressionLanguageSupported(true) .required(true) @@ -101,7 +111,7 @@ public class PutCloudWatchMetric extends AbstractAWSCredentialsProviderProcessor .displayName("Value") .description("The value for the metric. Must be a double") .expressionLanguageSupported(true) - .required(true) + .required(false) .addValidator(DOUBLE_VALIDATOR) .build(); @@ -123,23 +133,113 @@ public class PutCloudWatchMetric extends AbstractAWSCredentialsProviderProcessor .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); + public static final PropertyDescriptor MAXIMUM = new PropertyDescriptor.Builder() + .name("maximum") + .displayName("Maximum") + .description("The maximum value of the sample set. Must be a double") + .expressionLanguageSupported(true) + .required(false) + .addValidator(DOUBLE_VALIDATOR) + .build(); + + public static final PropertyDescriptor MINIMUM = new PropertyDescriptor.Builder() + .name("minimum") + .displayName("Minimum") + .description("The minimum value of the sample set. Must be a double") + .expressionLanguageSupported(true) + .required(false) + .addValidator(DOUBLE_VALIDATOR) + .build(); + + public static final PropertyDescriptor SAMPLECOUNT = new PropertyDescriptor.Builder() + .name("sampleCount") + .displayName("Sample Count") + .description("The number of samples used for the statistic set. Must be a double") + .expressionLanguageSupported(true) + .required(false) + .addValidator(DOUBLE_VALIDATOR) + .build(); + + public static final PropertyDescriptor SUM = new PropertyDescriptor.Builder() + .name("sum") + .displayName("Sum") + .description("The sum of values for the sample set. Must be a double") + .expressionLanguageSupported(true) + .required(false) + .addValidator(DOUBLE_VALIDATOR) + .build(); + public static final List properties = Collections.unmodifiableList( - Arrays.asList(NAMESPACE, METRIC_NAME, VALUE, TIMESTAMP, UNIT, REGION, ACCESS_KEY, SECRET_KEY, - CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, SSL_CONTEXT_SERVICE, - ENDPOINT_OVERRIDE, PROXY_HOST, PROXY_HOST_PORT) + Arrays.asList(NAMESPACE, METRIC_NAME, VALUE, MAXIMUM, MINIMUM, SAMPLECOUNT, SUM, TIMESTAMP, + UNIT, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, + TIMEOUT, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE, PROXY_HOST, PROXY_HOST_PORT) ); + private volatile Set dynamicPropertyNames = new HashSet<>(); + @Override protected List getSupportedPropertyDescriptors() { return properties; } + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .name(propertyDescriptorName) + .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true)) + .expressionLanguageSupported(true) + .dynamic(true) + .build(); + } + + @Override + public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { + if (descriptor.isDynamic()) { + final Set newDynamicPropertyNames = new HashSet<>(dynamicPropertyNames); + if (newValue == null) { // removing a property + newDynamicPropertyNames.remove(descriptor.getName()); + } else if (oldValue == null) { // adding a new property + newDynamicPropertyNames.add(descriptor.getName()); + } + this.dynamicPropertyNames = Collections.unmodifiableSet(newDynamicPropertyNames); + } + } + @Override public Set getRelationships() { return relationships; } + @Override + protected Collection customValidate(final ValidationContext validationContext) { + Collection problems = super.customValidate(validationContext); + + final boolean valueSet = validationContext.getProperty(VALUE).isSet(); + final boolean maxSet = validationContext.getProperty(MAXIMUM).isSet(); + final boolean minSet = validationContext.getProperty(MINIMUM).isSet(); + final boolean sampleCountSet = validationContext.getProperty(SAMPLECOUNT).isSet(); + final boolean sumSet = validationContext.getProperty(SUM).isSet(); + + final boolean completeStatisticSet = (maxSet && minSet && sampleCountSet && sumSet); + final boolean anyStatisticSetValue = (maxSet || minSet || sampleCountSet || sumSet); + + if (valueSet && anyStatisticSetValue) { + problems.add(new ValidationResult.Builder().subject("Metric").valid(false) + .explanation("Cannot set both Value and StatisticSet(Maximum, Minimum, SampleCount, Sum) properties").build()); + } else if (!valueSet && !completeStatisticSet) { + problems.add(new ValidationResult.Builder().subject("Metric").valid(false) + .explanation("Must set either Value or complete StatisticSet(Maximum, Minimum, SampleCount, Sum) properties").build()); + } + + if (dynamicPropertyNames.size() > 10) { + problems.add(new ValidationResult.Builder().subject("Metric").valid(false) + .explanation("Cannot set more than 10 dimensions").build()); + } + + return problems; + } + /** * Create client using aws credentials provider. This is the preferred way for creating clients */ @@ -171,7 +271,18 @@ public class PutCloudWatchMetric extends AbstractAWSCredentialsProviderProcessor try { datum.setMetricName(context.getProperty(METRIC_NAME).evaluateAttributeExpressions(flowFile).getValue()); - datum.setValue(Double.parseDouble(context.getProperty(VALUE).evaluateAttributeExpressions(flowFile).getValue())); + final String valueString = context.getProperty(VALUE).evaluateAttributeExpressions(flowFile).getValue(); + if (valueString != null) { + datum.setValue(Double.parseDouble(valueString)); + } else { + StatisticSet statisticSet = new StatisticSet(); + statisticSet.setMaximum(Double.parseDouble(context.getProperty(MAXIMUM).evaluateAttributeExpressions(flowFile).getValue())); + statisticSet.setMinimum(Double.parseDouble(context.getProperty(MINIMUM).evaluateAttributeExpressions(flowFile).getValue())); + statisticSet.setSampleCount(Double.parseDouble(context.getProperty(SAMPLECOUNT).evaluateAttributeExpressions(flowFile).getValue())); + statisticSet.setSum(Double.parseDouble(context.getProperty(SUM).evaluateAttributeExpressions(flowFile).getValue())); + + datum.setStatisticValues(statisticSet); + } final String timestamp = context.getProperty(TIMESTAMP).evaluateAttributeExpressions(flowFile).getValue(); if (timestamp != null) { @@ -183,6 +294,18 @@ public class PutCloudWatchMetric extends AbstractAWSCredentialsProviderProcessor datum.setUnit(unit); } + // add dynamic properties as dimensions + if (!dynamicPropertyNames.isEmpty()) { + final List dimensions = new ArrayList<>(dynamicPropertyNames.size()); + for (String propertyName : dynamicPropertyNames) { + final String propertyValue = context.getProperty(propertyName).evaluateAttributeExpressions(flowFile).getValue(); + if (StringUtils.isNotBlank(propertyValue)) { + dimensions.add(new Dimension().withName(propertyName).withValue(propertyValue)); + } + } + datum.withDimensions(dimensions); + } + final PutMetricDataRequest metricDataRequest = new PutMetricDataRequest() .withNamespace(context.getProperty(NAMESPACE).evaluateAttributeExpressions(flowFile).getValue()) .withMetricData(datum); @@ -204,4 +327,4 @@ public class PutCloudWatchMetric extends AbstractAWSCredentialsProviderProcessor return result; } -} \ No newline at end of file +} diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/cloudwatch/TestPutCloudWatchMetric.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/cloudwatch/TestPutCloudWatchMetric.java index dbc7f8c2d7..28bb4e42a6 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/cloudwatch/TestPutCloudWatchMetric.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/cloudwatch/TestPutCloudWatchMetric.java @@ -16,12 +16,16 @@ */ package org.apache.nifi.processors.aws.cloudwatch; +import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.HashMap; +import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; +import com.amazonaws.services.cloudwatch.model.Dimension; import com.amazonaws.services.cloudwatch.model.InvalidParameterValueException; import com.amazonaws.services.cloudwatch.model.MetricDatum; import org.junit.Test; @@ -66,6 +70,63 @@ public class TestPutCloudWatchMetric { runner.assertNotValid(); } + @Test + public void testMissingBothValueAndStatisticSetInvalid() throws Exception { + MockPutCloudWatchMetric mockPutCloudWatchMetric = new MockPutCloudWatchMetric(); + final TestRunner runner = TestRunners.newTestRunner(mockPutCloudWatchMetric); + + runner.setProperty(PutCloudWatchMetric.NAMESPACE, "TestNamespace"); + runner.setProperty(PutCloudWatchMetric.METRIC_NAME, "TestMetric"); + runner.assertNotValid(); + } + + @Test + public void testContainsBothValueAndStatisticSetInvalid() throws Exception { + MockPutCloudWatchMetric mockPutCloudWatchMetric = new MockPutCloudWatchMetric(); + final TestRunner runner = TestRunners.newTestRunner(mockPutCloudWatchMetric); + + runner.setProperty(PutCloudWatchMetric.NAMESPACE, "TestNamespace"); + runner.setProperty(PutCloudWatchMetric.METRIC_NAME, "TestMetric"); + runner.setProperty(PutCloudWatchMetric.VALUE, "1.0"); + runner.setProperty(PutCloudWatchMetric.UNIT, "Count"); + runner.setProperty(PutCloudWatchMetric.TIMESTAMP, "1476296132575"); + runner.setProperty(PutCloudWatchMetric.MINIMUM, "1.0"); + runner.setProperty(PutCloudWatchMetric.MAXIMUM, "2.0"); + runner.setProperty(PutCloudWatchMetric.SUM, "3.0"); + runner.setProperty(PutCloudWatchMetric.SAMPLECOUNT, "2"); + runner.assertNotValid(); + } + + @Test + public void testContainsIncompleteStatisticSetInvalid() throws Exception { + MockPutCloudWatchMetric mockPutCloudWatchMetric = new MockPutCloudWatchMetric(); + final TestRunner runner = TestRunners.newTestRunner(mockPutCloudWatchMetric); + + runner.setProperty(PutCloudWatchMetric.NAMESPACE, "TestNamespace"); + runner.setProperty(PutCloudWatchMetric.METRIC_NAME, "TestMetric"); + runner.setProperty(PutCloudWatchMetric.UNIT, "Count"); + runner.setProperty(PutCloudWatchMetric.TIMESTAMP, "1476296132575"); + runner.setProperty(PutCloudWatchMetric.MINIMUM, "1.0"); + runner.setProperty(PutCloudWatchMetric.MAXIMUM, "2.0"); + runner.setProperty(PutCloudWatchMetric.SUM, "3.0"); + // missing sample count + runner.assertNotValid(); + } + + @Test + public void testContainsBothValueAndIncompleteStatisticSetInvalid() throws Exception { + MockPutCloudWatchMetric mockPutCloudWatchMetric = new MockPutCloudWatchMetric(); + final TestRunner runner = TestRunners.newTestRunner(mockPutCloudWatchMetric); + + runner.setProperty(PutCloudWatchMetric.NAMESPACE, "TestNamespace"); + runner.setProperty(PutCloudWatchMetric.METRIC_NAME, "TestMetric"); + runner.setProperty(PutCloudWatchMetric.VALUE, "1.0"); + runner.setProperty(PutCloudWatchMetric.UNIT, "Count"); + runner.setProperty(PutCloudWatchMetric.TIMESTAMP, "1476296132575"); + runner.setProperty(PutCloudWatchMetric.MINIMUM, "1.0"); + runner.assertNotValid(); + } + @Test public void testMetricExpressionValid() throws Exception { MockPutCloudWatchMetric mockPutCloudWatchMetric = new MockPutCloudWatchMetric(); @@ -89,6 +150,105 @@ public class TestPutCloudWatchMetric { Assert.assertEquals(1.23d, datum.getValue(), 0.0001d); } + @Test + public void testStatisticSet() throws Exception { + MockPutCloudWatchMetric mockPutCloudWatchMetric = new MockPutCloudWatchMetric(); + final TestRunner runner = TestRunners.newTestRunner(mockPutCloudWatchMetric); + + runner.setProperty(PutCloudWatchMetric.NAMESPACE, "TestNamespace"); + runner.setProperty(PutCloudWatchMetric.METRIC_NAME, "TestMetric"); + runner.setProperty(PutCloudWatchMetric.MINIMUM, "${metric.min}"); + runner.setProperty(PutCloudWatchMetric.MAXIMUM, "${metric.max}"); + runner.setProperty(PutCloudWatchMetric.SUM, "${metric.sum}"); + runner.setProperty(PutCloudWatchMetric.SAMPLECOUNT, "${metric.count}"); + runner.assertValid(); + + final Map attributes = new HashMap<>(); + attributes.put("metric.min", "1"); + attributes.put("metric.max", "2"); + attributes.put("metric.sum", "3"); + attributes.put("metric.count", "2"); + runner.enqueue(new byte[] {}, attributes); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutCloudWatchMetric.REL_SUCCESS, 1); + Assert.assertEquals(1, mockPutCloudWatchMetric.putMetricDataCallCount); + Assert.assertEquals("TestNamespace", mockPutCloudWatchMetric.actualNamespace); + MetricDatum datum = mockPutCloudWatchMetric.actualMetricData.get(0); + Assert.assertEquals("TestMetric", datum.getMetricName()); + Assert.assertEquals(1.0d, datum.getStatisticValues().getMinimum(), 0.0001d); + Assert.assertEquals(2.0d, datum.getStatisticValues().getMaximum(), 0.0001d); + Assert.assertEquals(3.0d, datum.getStatisticValues().getSum(), 0.0001d); + Assert.assertEquals(2.0d, datum.getStatisticValues().getSampleCount(), 0.0001d); + } + + @Test + public void testDimensions() throws Exception { + MockPutCloudWatchMetric mockPutCloudWatchMetric = new MockPutCloudWatchMetric(); + final TestRunner runner = TestRunners.newTestRunner(mockPutCloudWatchMetric); + + runner.setProperty(PutCloudWatchMetric.NAMESPACE, "TestNamespace"); + runner.setProperty(PutCloudWatchMetric.METRIC_NAME, "TestMetric"); + runner.setProperty(PutCloudWatchMetric.VALUE, "1.0"); + runner.setProperty(PutCloudWatchMetric.UNIT, "Count"); + runner.setProperty(PutCloudWatchMetric.TIMESTAMP, "1476296132575"); + runner.setProperty(new PropertyDescriptor.Builder().dynamic(true).name("dim1").build(), "${metric.dim1}"); + runner.setProperty(new PropertyDescriptor.Builder().dynamic(true).name("dim2").build(), "val2"); + runner.assertValid(); + + final Map attributes = new HashMap<>(); + attributes.put("metric.dim1", "1"); + runner.enqueue(new byte[] {}, attributes); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutCloudWatchMetric.REL_SUCCESS, 1); + Assert.assertEquals(1, mockPutCloudWatchMetric.putMetricDataCallCount); + Assert.assertEquals("TestNamespace", mockPutCloudWatchMetric.actualNamespace); + MetricDatum datum = mockPutCloudWatchMetric.actualMetricData.get(0); + Assert.assertEquals("TestMetric", datum.getMetricName()); + Assert.assertEquals(1d, datum.getValue(), 0.0001d); + + List dimensions = datum.getDimensions(); + Collections.sort(dimensions, (d1, d2) -> d1.getName().compareTo(d2.getName())); + Assert.assertEquals(2, dimensions.size()); + Assert.assertEquals("dim1", dimensions.get(0).getName()); + Assert.assertEquals("1", dimensions.get(0).getValue()); + Assert.assertEquals("dim2", dimensions.get(1).getName()); + Assert.assertEquals("val2", dimensions.get(1).getValue()); + } + + @Test + public void testMaximumDimensions() throws Exception { + MockPutCloudWatchMetric mockPutCloudWatchMetric = new MockPutCloudWatchMetric(); + final TestRunner runner = TestRunners.newTestRunner(mockPutCloudWatchMetric); + + runner.setProperty(PutCloudWatchMetric.NAMESPACE, "TestNamespace"); + runner.setProperty(PutCloudWatchMetric.METRIC_NAME, "TestMetric"); + runner.setProperty(PutCloudWatchMetric.VALUE, "1.0"); + runner.setProperty(PutCloudWatchMetric.UNIT, "Count"); + runner.setProperty(PutCloudWatchMetric.TIMESTAMP, "1476296132575"); + for (int i=0; i < 10; i++) { + runner.setProperty(new PropertyDescriptor.Builder().dynamic(true).name("dim" + i).build(), "0"); + } + runner.assertValid(); + } + + @Test + public void testTooManyDimensions() throws Exception { + MockPutCloudWatchMetric mockPutCloudWatchMetric = new MockPutCloudWatchMetric(); + final TestRunner runner = TestRunners.newTestRunner(mockPutCloudWatchMetric); + + runner.setProperty(PutCloudWatchMetric.NAMESPACE, "TestNamespace"); + runner.setProperty(PutCloudWatchMetric.METRIC_NAME, "TestMetric"); + runner.setProperty(PutCloudWatchMetric.VALUE, "1.0"); + runner.setProperty(PutCloudWatchMetric.UNIT, "Count"); + runner.setProperty(PutCloudWatchMetric.TIMESTAMP, "1476296132575"); + for (int i=0; i < 11; i++) { + runner.setProperty(new PropertyDescriptor.Builder().dynamic(true).name("dim" + i).build(), "0"); + } + runner.assertNotValid(); + } + @Test public void testMetricExpressionInvalidRoutesToFailure() throws Exception { MockPutCloudWatchMetric mockPutCloudWatchMetric = new MockPutCloudWatchMetric();