NIFI-3856 PutCloudWatchMetric Stats, Dimensions

PutCloudWatchMetric support for StatisticSets and Dimensions

Signed-off-by: James Wing <jvwing@gmail.com>

This closes #1775.
This commit is contained in:
Tim Reardon 2017-05-09 14:00:49 -04:00 committed by James Wing
parent 361a58c531
commit dc44e30698
2 changed files with 292 additions and 9 deletions

View File

@ -16,13 +16,17 @@
*/ */
package org.apache.nifi.processors.aws.cloudwatch; package org.apache.nifi.processors.aws.cloudwatch;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Date; import java.util.Date;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.behavior.SupportsBatching;
@ -32,6 +36,7 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator; import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSession;
@ -45,15 +50,20 @@ import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentials; import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.services.cloudwatch.AmazonCloudWatchClient; import com.amazonaws.services.cloudwatch.AmazonCloudWatchClient;
import com.amazonaws.services.cloudwatch.model.Dimension;
import com.amazonaws.services.cloudwatch.model.MetricDatum; import com.amazonaws.services.cloudwatch.model.MetricDatum;
import com.amazonaws.services.cloudwatch.model.PutMetricDataRequest; import com.amazonaws.services.cloudwatch.model.PutMetricDataRequest;
import com.amazonaws.services.cloudwatch.model.PutMetricDataResult; import com.amazonaws.services.cloudwatch.model.PutMetricDataResult;
import com.amazonaws.services.cloudwatch.model.StatisticSet;
@SupportsBatching @SupportsBatching
@InputRequirement(Requirement.INPUT_REQUIRED) @InputRequirement(Requirement.INPUT_REQUIRED)
@CapabilityDescription("Publishes metrics to Amazon CloudWatch. Metric can be either a single value, or a StatisticSet comprised of "+
"minimum, maximum, sum and sample count.")
@DynamicProperty(name = "Dimension Name", value = "Dimension Value",
description = "Allows dimension name/value pairs to be added to the metric. AWS supports a maximum of 10 dimensions.",
supportsExpressionLanguage = true)
@Tags({"amazon", "aws", "cloudwatch", "metrics", "put", "publish"}) @Tags({"amazon", "aws", "cloudwatch", "metrics", "put", "publish"})
@CapabilityDescription("Publishes metrics to Amazon CloudWatch")
public class PutCloudWatchMetric extends AbstractAWSCredentialsProviderProcessor<AmazonCloudWatchClient> { public class PutCloudWatchMetric extends AbstractAWSCredentialsProviderProcessor<AmazonCloudWatchClient> {
public static final Set<Relationship> relationships = Collections.unmodifiableSet( public static final Set<Relationship> relationships = Collections.unmodifiableSet(
@ -89,7 +99,7 @@ public class PutCloudWatchMetric extends AbstractAWSCredentialsProviderProcessor
public static final PropertyDescriptor METRIC_NAME = new PropertyDescriptor.Builder() public static final PropertyDescriptor METRIC_NAME = new PropertyDescriptor.Builder()
.name("MetricName") .name("MetricName")
.displayName("MetricName") .displayName("Metric Name")
.description("The name of the metric") .description("The name of the metric")
.expressionLanguageSupported(true) .expressionLanguageSupported(true)
.required(true) .required(true)
@ -101,7 +111,7 @@ public class PutCloudWatchMetric extends AbstractAWSCredentialsProviderProcessor
.displayName("Value") .displayName("Value")
.description("The value for the metric. Must be a double") .description("The value for the metric. Must be a double")
.expressionLanguageSupported(true) .expressionLanguageSupported(true)
.required(true) .required(false)
.addValidator(DOUBLE_VALIDATOR) .addValidator(DOUBLE_VALIDATOR)
.build(); .build();
@ -123,23 +133,113 @@ public class PutCloudWatchMetric extends AbstractAWSCredentialsProviderProcessor
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build(); .build();
public static final PropertyDescriptor MAXIMUM = new PropertyDescriptor.Builder()
.name("maximum")
.displayName("Maximum")
.description("The maximum value of the sample set. Must be a double")
.expressionLanguageSupported(true)
.required(false)
.addValidator(DOUBLE_VALIDATOR)
.build();
public static final PropertyDescriptor MINIMUM = new PropertyDescriptor.Builder()
.name("minimum")
.displayName("Minimum")
.description("The minimum value of the sample set. Must be a double")
.expressionLanguageSupported(true)
.required(false)
.addValidator(DOUBLE_VALIDATOR)
.build();
public static final PropertyDescriptor SAMPLECOUNT = new PropertyDescriptor.Builder()
.name("sampleCount")
.displayName("Sample Count")
.description("The number of samples used for the statistic set. Must be a double")
.expressionLanguageSupported(true)
.required(false)
.addValidator(DOUBLE_VALIDATOR)
.build();
public static final PropertyDescriptor SUM = new PropertyDescriptor.Builder()
.name("sum")
.displayName("Sum")
.description("The sum of values for the sample set. Must be a double")
.expressionLanguageSupported(true)
.required(false)
.addValidator(DOUBLE_VALIDATOR)
.build();
public static final List<PropertyDescriptor> properties = public static final List<PropertyDescriptor> properties =
Collections.unmodifiableList( Collections.unmodifiableList(
Arrays.asList(NAMESPACE, METRIC_NAME, VALUE, TIMESTAMP, UNIT, REGION, ACCESS_KEY, SECRET_KEY, Arrays.asList(NAMESPACE, METRIC_NAME, VALUE, MAXIMUM, MINIMUM, SAMPLECOUNT, SUM, TIMESTAMP,
CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, SSL_CONTEXT_SERVICE, UNIT, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE,
ENDPOINT_OVERRIDE, PROXY_HOST, PROXY_HOST_PORT) TIMEOUT, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE, PROXY_HOST, PROXY_HOST_PORT)
); );
private volatile Set<String> dynamicPropertyNames = new HashSet<>();
@Override @Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties; return properties;
} }
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
.name(propertyDescriptorName)
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true))
.expressionLanguageSupported(true)
.dynamic(true)
.build();
}
@Override
public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
if (descriptor.isDynamic()) {
final Set<String> newDynamicPropertyNames = new HashSet<>(dynamicPropertyNames);
if (newValue == null) { // removing a property
newDynamicPropertyNames.remove(descriptor.getName());
} else if (oldValue == null) { // adding a new property
newDynamicPropertyNames.add(descriptor.getName());
}
this.dynamicPropertyNames = Collections.unmodifiableSet(newDynamicPropertyNames);
}
}
@Override @Override
public Set<Relationship> getRelationships() { public Set<Relationship> getRelationships() {
return relationships; return relationships;
} }
@Override
protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
Collection<ValidationResult> problems = super.customValidate(validationContext);
final boolean valueSet = validationContext.getProperty(VALUE).isSet();
final boolean maxSet = validationContext.getProperty(MAXIMUM).isSet();
final boolean minSet = validationContext.getProperty(MINIMUM).isSet();
final boolean sampleCountSet = validationContext.getProperty(SAMPLECOUNT).isSet();
final boolean sumSet = validationContext.getProperty(SUM).isSet();
final boolean completeStatisticSet = (maxSet && minSet && sampleCountSet && sumSet);
final boolean anyStatisticSetValue = (maxSet || minSet || sampleCountSet || sumSet);
if (valueSet && anyStatisticSetValue) {
problems.add(new ValidationResult.Builder().subject("Metric").valid(false)
.explanation("Cannot set both Value and StatisticSet(Maximum, Minimum, SampleCount, Sum) properties").build());
} else if (!valueSet && !completeStatisticSet) {
problems.add(new ValidationResult.Builder().subject("Metric").valid(false)
.explanation("Must set either Value or complete StatisticSet(Maximum, Minimum, SampleCount, Sum) properties").build());
}
if (dynamicPropertyNames.size() > 10) {
problems.add(new ValidationResult.Builder().subject("Metric").valid(false)
.explanation("Cannot set more than 10 dimensions").build());
}
return problems;
}
/** /**
* Create client using aws credentials provider. This is the preferred way for creating clients * Create client using aws credentials provider. This is the preferred way for creating clients
*/ */
@ -171,7 +271,18 @@ public class PutCloudWatchMetric extends AbstractAWSCredentialsProviderProcessor
try { try {
datum.setMetricName(context.getProperty(METRIC_NAME).evaluateAttributeExpressions(flowFile).getValue()); datum.setMetricName(context.getProperty(METRIC_NAME).evaluateAttributeExpressions(flowFile).getValue());
datum.setValue(Double.parseDouble(context.getProperty(VALUE).evaluateAttributeExpressions(flowFile).getValue())); final String valueString = context.getProperty(VALUE).evaluateAttributeExpressions(flowFile).getValue();
if (valueString != null) {
datum.setValue(Double.parseDouble(valueString));
} else {
StatisticSet statisticSet = new StatisticSet();
statisticSet.setMaximum(Double.parseDouble(context.getProperty(MAXIMUM).evaluateAttributeExpressions(flowFile).getValue()));
statisticSet.setMinimum(Double.parseDouble(context.getProperty(MINIMUM).evaluateAttributeExpressions(flowFile).getValue()));
statisticSet.setSampleCount(Double.parseDouble(context.getProperty(SAMPLECOUNT).evaluateAttributeExpressions(flowFile).getValue()));
statisticSet.setSum(Double.parseDouble(context.getProperty(SUM).evaluateAttributeExpressions(flowFile).getValue()));
datum.setStatisticValues(statisticSet);
}
final String timestamp = context.getProperty(TIMESTAMP).evaluateAttributeExpressions(flowFile).getValue(); final String timestamp = context.getProperty(TIMESTAMP).evaluateAttributeExpressions(flowFile).getValue();
if (timestamp != null) { if (timestamp != null) {
@ -183,6 +294,18 @@ public class PutCloudWatchMetric extends AbstractAWSCredentialsProviderProcessor
datum.setUnit(unit); datum.setUnit(unit);
} }
// add dynamic properties as dimensions
if (!dynamicPropertyNames.isEmpty()) {
final List<Dimension> dimensions = new ArrayList<>(dynamicPropertyNames.size());
for (String propertyName : dynamicPropertyNames) {
final String propertyValue = context.getProperty(propertyName).evaluateAttributeExpressions(flowFile).getValue();
if (StringUtils.isNotBlank(propertyValue)) {
dimensions.add(new Dimension().withName(propertyName).withValue(propertyValue));
}
}
datum.withDimensions(dimensions);
}
final PutMetricDataRequest metricDataRequest = new PutMetricDataRequest() final PutMetricDataRequest metricDataRequest = new PutMetricDataRequest()
.withNamespace(context.getProperty(NAMESPACE).evaluateAttributeExpressions(flowFile).getValue()) .withNamespace(context.getProperty(NAMESPACE).evaluateAttributeExpressions(flowFile).getValue())
.withMetricData(datum); .withMetricData(datum);
@ -204,4 +327,4 @@ public class PutCloudWatchMetric extends AbstractAWSCredentialsProviderProcessor
return result; return result;
} }
} }

View File

@ -16,12 +16,16 @@
*/ */
package org.apache.nifi.processors.aws.cloudwatch; package org.apache.nifi.processors.aws.cloudwatch;
import java.util.Collections;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.HashMap; import java.util.HashMap;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.TestRunners;
import com.amazonaws.services.cloudwatch.model.Dimension;
import com.amazonaws.services.cloudwatch.model.InvalidParameterValueException; import com.amazonaws.services.cloudwatch.model.InvalidParameterValueException;
import com.amazonaws.services.cloudwatch.model.MetricDatum; import com.amazonaws.services.cloudwatch.model.MetricDatum;
import org.junit.Test; import org.junit.Test;
@ -66,6 +70,63 @@ public class TestPutCloudWatchMetric {
runner.assertNotValid(); runner.assertNotValid();
} }
@Test
public void testMissingBothValueAndStatisticSetInvalid() throws Exception {
MockPutCloudWatchMetric mockPutCloudWatchMetric = new MockPutCloudWatchMetric();
final TestRunner runner = TestRunners.newTestRunner(mockPutCloudWatchMetric);
runner.setProperty(PutCloudWatchMetric.NAMESPACE, "TestNamespace");
runner.setProperty(PutCloudWatchMetric.METRIC_NAME, "TestMetric");
runner.assertNotValid();
}
@Test
public void testContainsBothValueAndStatisticSetInvalid() throws Exception {
MockPutCloudWatchMetric mockPutCloudWatchMetric = new MockPutCloudWatchMetric();
final TestRunner runner = TestRunners.newTestRunner(mockPutCloudWatchMetric);
runner.setProperty(PutCloudWatchMetric.NAMESPACE, "TestNamespace");
runner.setProperty(PutCloudWatchMetric.METRIC_NAME, "TestMetric");
runner.setProperty(PutCloudWatchMetric.VALUE, "1.0");
runner.setProperty(PutCloudWatchMetric.UNIT, "Count");
runner.setProperty(PutCloudWatchMetric.TIMESTAMP, "1476296132575");
runner.setProperty(PutCloudWatchMetric.MINIMUM, "1.0");
runner.setProperty(PutCloudWatchMetric.MAXIMUM, "2.0");
runner.setProperty(PutCloudWatchMetric.SUM, "3.0");
runner.setProperty(PutCloudWatchMetric.SAMPLECOUNT, "2");
runner.assertNotValid();
}
@Test
public void testContainsIncompleteStatisticSetInvalid() throws Exception {
MockPutCloudWatchMetric mockPutCloudWatchMetric = new MockPutCloudWatchMetric();
final TestRunner runner = TestRunners.newTestRunner(mockPutCloudWatchMetric);
runner.setProperty(PutCloudWatchMetric.NAMESPACE, "TestNamespace");
runner.setProperty(PutCloudWatchMetric.METRIC_NAME, "TestMetric");
runner.setProperty(PutCloudWatchMetric.UNIT, "Count");
runner.setProperty(PutCloudWatchMetric.TIMESTAMP, "1476296132575");
runner.setProperty(PutCloudWatchMetric.MINIMUM, "1.0");
runner.setProperty(PutCloudWatchMetric.MAXIMUM, "2.0");
runner.setProperty(PutCloudWatchMetric.SUM, "3.0");
// missing sample count
runner.assertNotValid();
}
@Test
public void testContainsBothValueAndIncompleteStatisticSetInvalid() throws Exception {
MockPutCloudWatchMetric mockPutCloudWatchMetric = new MockPutCloudWatchMetric();
final TestRunner runner = TestRunners.newTestRunner(mockPutCloudWatchMetric);
runner.setProperty(PutCloudWatchMetric.NAMESPACE, "TestNamespace");
runner.setProperty(PutCloudWatchMetric.METRIC_NAME, "TestMetric");
runner.setProperty(PutCloudWatchMetric.VALUE, "1.0");
runner.setProperty(PutCloudWatchMetric.UNIT, "Count");
runner.setProperty(PutCloudWatchMetric.TIMESTAMP, "1476296132575");
runner.setProperty(PutCloudWatchMetric.MINIMUM, "1.0");
runner.assertNotValid();
}
@Test @Test
public void testMetricExpressionValid() throws Exception { public void testMetricExpressionValid() throws Exception {
MockPutCloudWatchMetric mockPutCloudWatchMetric = new MockPutCloudWatchMetric(); MockPutCloudWatchMetric mockPutCloudWatchMetric = new MockPutCloudWatchMetric();
@ -89,6 +150,105 @@ public class TestPutCloudWatchMetric {
Assert.assertEquals(1.23d, datum.getValue(), 0.0001d); Assert.assertEquals(1.23d, datum.getValue(), 0.0001d);
} }
@Test
public void testStatisticSet() throws Exception {
MockPutCloudWatchMetric mockPutCloudWatchMetric = new MockPutCloudWatchMetric();
final TestRunner runner = TestRunners.newTestRunner(mockPutCloudWatchMetric);
runner.setProperty(PutCloudWatchMetric.NAMESPACE, "TestNamespace");
runner.setProperty(PutCloudWatchMetric.METRIC_NAME, "TestMetric");
runner.setProperty(PutCloudWatchMetric.MINIMUM, "${metric.min}");
runner.setProperty(PutCloudWatchMetric.MAXIMUM, "${metric.max}");
runner.setProperty(PutCloudWatchMetric.SUM, "${metric.sum}");
runner.setProperty(PutCloudWatchMetric.SAMPLECOUNT, "${metric.count}");
runner.assertValid();
final Map<String, String> attributes = new HashMap<>();
attributes.put("metric.min", "1");
attributes.put("metric.max", "2");
attributes.put("metric.sum", "3");
attributes.put("metric.count", "2");
runner.enqueue(new byte[] {}, attributes);
runner.run();
runner.assertAllFlowFilesTransferred(PutCloudWatchMetric.REL_SUCCESS, 1);
Assert.assertEquals(1, mockPutCloudWatchMetric.putMetricDataCallCount);
Assert.assertEquals("TestNamespace", mockPutCloudWatchMetric.actualNamespace);
MetricDatum datum = mockPutCloudWatchMetric.actualMetricData.get(0);
Assert.assertEquals("TestMetric", datum.getMetricName());
Assert.assertEquals(1.0d, datum.getStatisticValues().getMinimum(), 0.0001d);
Assert.assertEquals(2.0d, datum.getStatisticValues().getMaximum(), 0.0001d);
Assert.assertEquals(3.0d, datum.getStatisticValues().getSum(), 0.0001d);
Assert.assertEquals(2.0d, datum.getStatisticValues().getSampleCount(), 0.0001d);
}
@Test
public void testDimensions() throws Exception {
MockPutCloudWatchMetric mockPutCloudWatchMetric = new MockPutCloudWatchMetric();
final TestRunner runner = TestRunners.newTestRunner(mockPutCloudWatchMetric);
runner.setProperty(PutCloudWatchMetric.NAMESPACE, "TestNamespace");
runner.setProperty(PutCloudWatchMetric.METRIC_NAME, "TestMetric");
runner.setProperty(PutCloudWatchMetric.VALUE, "1.0");
runner.setProperty(PutCloudWatchMetric.UNIT, "Count");
runner.setProperty(PutCloudWatchMetric.TIMESTAMP, "1476296132575");
runner.setProperty(new PropertyDescriptor.Builder().dynamic(true).name("dim1").build(), "${metric.dim1}");
runner.setProperty(new PropertyDescriptor.Builder().dynamic(true).name("dim2").build(), "val2");
runner.assertValid();
final Map<String, String> attributes = new HashMap<>();
attributes.put("metric.dim1", "1");
runner.enqueue(new byte[] {}, attributes);
runner.run();
runner.assertAllFlowFilesTransferred(PutCloudWatchMetric.REL_SUCCESS, 1);
Assert.assertEquals(1, mockPutCloudWatchMetric.putMetricDataCallCount);
Assert.assertEquals("TestNamespace", mockPutCloudWatchMetric.actualNamespace);
MetricDatum datum = mockPutCloudWatchMetric.actualMetricData.get(0);
Assert.assertEquals("TestMetric", datum.getMetricName());
Assert.assertEquals(1d, datum.getValue(), 0.0001d);
List<Dimension> dimensions = datum.getDimensions();
Collections.sort(dimensions, (d1, d2) -> d1.getName().compareTo(d2.getName()));
Assert.assertEquals(2, dimensions.size());
Assert.assertEquals("dim1", dimensions.get(0).getName());
Assert.assertEquals("1", dimensions.get(0).getValue());
Assert.assertEquals("dim2", dimensions.get(1).getName());
Assert.assertEquals("val2", dimensions.get(1).getValue());
}
@Test
public void testMaximumDimensions() throws Exception {
MockPutCloudWatchMetric mockPutCloudWatchMetric = new MockPutCloudWatchMetric();
final TestRunner runner = TestRunners.newTestRunner(mockPutCloudWatchMetric);
runner.setProperty(PutCloudWatchMetric.NAMESPACE, "TestNamespace");
runner.setProperty(PutCloudWatchMetric.METRIC_NAME, "TestMetric");
runner.setProperty(PutCloudWatchMetric.VALUE, "1.0");
runner.setProperty(PutCloudWatchMetric.UNIT, "Count");
runner.setProperty(PutCloudWatchMetric.TIMESTAMP, "1476296132575");
for (int i=0; i < 10; i++) {
runner.setProperty(new PropertyDescriptor.Builder().dynamic(true).name("dim" + i).build(), "0");
}
runner.assertValid();
}
@Test
public void testTooManyDimensions() throws Exception {
MockPutCloudWatchMetric mockPutCloudWatchMetric = new MockPutCloudWatchMetric();
final TestRunner runner = TestRunners.newTestRunner(mockPutCloudWatchMetric);
runner.setProperty(PutCloudWatchMetric.NAMESPACE, "TestNamespace");
runner.setProperty(PutCloudWatchMetric.METRIC_NAME, "TestMetric");
runner.setProperty(PutCloudWatchMetric.VALUE, "1.0");
runner.setProperty(PutCloudWatchMetric.UNIT, "Count");
runner.setProperty(PutCloudWatchMetric.TIMESTAMP, "1476296132575");
for (int i=0; i < 11; i++) {
runner.setProperty(new PropertyDescriptor.Builder().dynamic(true).name("dim" + i).build(), "0");
}
runner.assertNotValid();
}
@Test @Test
public void testMetricExpressionInvalidRoutesToFailure() throws Exception { public void testMetricExpressionInvalidRoutesToFailure() throws Exception {
MockPutCloudWatchMetric mockPutCloudWatchMetric = new MockPutCloudWatchMetric(); MockPutCloudWatchMetric mockPutCloudWatchMetric = new MockPutCloudWatchMetric();