From 7107616420ed31c057ff7bf7f06bf36e5740b3ea Mon Sep 17 00:00:00 2001 From: Edgardo Date: Tue, 11 Oct 2016 14:07:42 -0400 Subject: [PATCH] NIFI-2872 Create PutCloudWatchMetric Processor This closes #1125. Signed-off-by: James Wing --- .../aws/cloudwatch/PutCloudWatchMetric.java | 207 ++++++++++++++++++ .../org.apache.nifi.processor.Processor | 4 +- .../aws/cloudwatch/ITPutCloudWatchMetric.java | 73 ++++++ .../cloudwatch/MockPutCloudWatchMetric.java | 49 +++++ .../cloudwatch/TestPutCloudWatchMetric.java | 151 +++++++++++++ 5 files changed, 483 insertions(+), 1 deletion(-) create mode 100644 nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/cloudwatch/PutCloudWatchMetric.java create mode 100644 nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/cloudwatch/ITPutCloudWatchMetric.java create mode 100644 nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/cloudwatch/MockPutCloudWatchMetric.java create mode 100644 nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/cloudwatch/TestPutCloudWatchMetric.java diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/cloudwatch/PutCloudWatchMetric.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/cloudwatch/PutCloudWatchMetric.java new file mode 100644 index 0000000000..56f2d99f2b --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/cloudwatch/PutCloudWatchMetric.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.cloudwatch; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Date; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.services.cloudwatch.AmazonCloudWatchClient; +import com.amazonaws.services.cloudwatch.model.MetricDatum; +import com.amazonaws.services.cloudwatch.model.PutMetricDataRequest; +import com.amazonaws.services.cloudwatch.model.PutMetricDataResult; + + +@SupportsBatching +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags({"amazon", "aws", "cloudwatch", "metrics", "put", "publish"}) +@CapabilityDescription("Publishes metrics to Amazon CloudWatch") +public class PutCloudWatchMetric extends AbstractAWSCredentialsProviderProcessor { + + public static final Set relationships = Collections.unmodifiableSet( + new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE))); + + private static final Validator DOUBLE_VALIDATOR = new Validator() { + @Override + public ValidationResult validate(String subject, String input, ValidationContext context) { + if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) { + return (new ValidationResult.Builder()).subject(subject).input(input).explanation("Expression Language Present").valid(true).build(); + } else { + String reason = null; + + try { + Double.parseDouble(input); + } catch (NumberFormatException e) { + reason = "not a valid Double"; + } + + return (new ValidationResult.Builder()).subject(subject).input(input).explanation(reason).valid(reason == null).build(); + } + } + }; + + public static final PropertyDescriptor NAMESPACE = new PropertyDescriptor.Builder() + .name("Namespace") + .displayName("Namespace") + .description("The namespace for the metric data for CloudWatch") + .required(true) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor METRIC_NAME = new PropertyDescriptor.Builder() + .name("MetricName") + .displayName("MetricName") + .description("The name of the metric") + .expressionLanguageSupported(true) + .required(true) + .addValidator(new StandardValidators.StringLengthValidator(1, 255)) + .build(); + + public static final PropertyDescriptor VALUE = new PropertyDescriptor.Builder() + .name("Value") + .displayName("Value") + .description("The value for the metric. Must be a double") + .expressionLanguageSupported(true) + .required(true) + .addValidator(DOUBLE_VALIDATOR) + .build(); + + public static final PropertyDescriptor TIMESTAMP = new PropertyDescriptor.Builder() + .name("Timestamp") + .displayName("Timestamp") + .description("A point in time expressed as the number of milliseconds since Jan 1, 1970 00:00:00 UTC. If not specified, the default value is set to the time the metric data was received") + .expressionLanguageSupported(true) + .required(false) + .addValidator(StandardValidators.LONG_VALIDATOR) + .build(); + + public static final PropertyDescriptor UNIT = new PropertyDescriptor.Builder() + .name("Unit") + .displayName("Unit") + .description("The unit of the metric. (e.g Seconds, Bytes, Megabytes, Percent, Count, Kilobytes/Second, Terabits/Second, Count/Second) For details see http://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_MetricDatum.html") + .expressionLanguageSupported(true) + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final List properties = + Collections.unmodifiableList( + Arrays.asList(NAMESPACE, METRIC_NAME, VALUE, TIMESTAMP, UNIT, REGION, ACCESS_KEY, SECRET_KEY, + CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, SSL_CONTEXT_SERVICE, + ENDPOINT_OVERRIDE, PROXY_HOST, PROXY_HOST_PORT) + ); + + @Override + protected List getSupportedPropertyDescriptors() { + return properties; + } + + @Override + public Set getRelationships() { + return relationships; + } + + /** + * Create client using aws credentials provider. This is the preferred way for creating clients + */ + + @Override + protected AmazonCloudWatchClient createClient(ProcessContext processContext, AWSCredentialsProvider awsCredentialsProvider, ClientConfiguration clientConfiguration) { + getLogger().info("Creating client using aws credentials provider"); + return new AmazonCloudWatchClient(awsCredentialsProvider, clientConfiguration); + } + + /** + * Create client using AWSCredentials + * + * @deprecated use {@link #createClient(ProcessContext, AWSCredentialsProvider, ClientConfiguration)} instead + */ + @Override + protected AmazonCloudWatchClient createClient(ProcessContext processContext, AWSCredentials awsCredentials, ClientConfiguration clientConfiguration) { + getLogger().debug("Creating client with aws credentials"); + return new AmazonCloudWatchClient(awsCredentials, clientConfiguration); + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + MetricDatum datum = new MetricDatum(); + + try { + datum.setMetricName(context.getProperty(METRIC_NAME).evaluateAttributeExpressions(flowFile).getValue()); + datum.setValue(Double.parseDouble(context.getProperty(VALUE).evaluateAttributeExpressions(flowFile).getValue())); + + final String timestamp = context.getProperty(TIMESTAMP).evaluateAttributeExpressions(flowFile).getValue(); + if (timestamp != null) { + datum.setTimestamp(new Date(Long.parseLong(timestamp))); + } + + final String unit = context.getProperty(UNIT).evaluateAttributeExpressions(flowFile).getValue(); + if (unit != null) { + datum.setUnit(unit); + } + + final PutMetricDataRequest metricDataRequest = new PutMetricDataRequest() + .withNamespace(context.getProperty(NAMESPACE).evaluateAttributeExpressions(flowFile).getValue()) + .withMetricData(datum); + + putMetricData(metricDataRequest); + session.transfer(flowFile, REL_SUCCESS); + getLogger().info("Successfully published cloudwatch metric for {}", new Object[]{flowFile}); + } catch (final Exception e) { + getLogger().error("Failed to publish cloudwatch metric for {} due to {}", new Object[]{flowFile, e}); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + } + + } + + protected PutMetricDataResult putMetricData(PutMetricDataRequest metricDataRequest) throws AmazonClientException { + final AmazonCloudWatchClient client = getClient(); + final PutMetricDataResult result = client.putMetricData(metricDataRequest); + return result; + } + +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index df265c3cf5..27e39f0505 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -20,9 +20,11 @@ org.apache.nifi.processors.aws.sns.PutSNS org.apache.nifi.processors.aws.sqs.GetSQS org.apache.nifi.processors.aws.sqs.PutSQS org.apache.nifi.processors.aws.sqs.DeleteSQS -org.apache.nifi.processors.aws.lambda.PutLambda +org.apache.nifi.processors.aws.lambda.PutLambda org.apache.nifi.processors.aws.kinesis.firehose.PutKinesisFirehose org.apache.nifi.processors.aws.dynamodb.GetDynamoDB org.apache.nifi.processors.aws.dynamodb.PutDynamoDB org.apache.nifi.processors.aws.dynamodb.DeleteDynamoDB org.apache.nifi.processors.aws.kinesis.stream.PutKinesisStream +org.apache.nifi.processors.aws.cloudwatch.PutCloudWatchMetric + diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/cloudwatch/ITPutCloudWatchMetric.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/cloudwatch/ITPutCloudWatchMetric.java new file mode 100644 index 0000000000..48eeecf19d --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/cloudwatch/ITPutCloudWatchMetric.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.cloudwatch; + +import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor; +import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService; +import org.apache.nifi.processors.aws.sns.PutSNS; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Test; + +import java.io.IOException; + +/** + * Provides integration level testing with actual AWS Cloudwatcch resources for {@link PutCloudWatchMetric} and requires additional configuration and resources to work. + */ +public class ITPutCloudWatchMetric { + + private final String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties"; + + @Test + public void testPublish() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new PutCloudWatchMetric()); + + runner.setProperty(PutCloudWatchMetric.NAMESPACE, "Test"); + runner.setProperty(PutCloudWatchMetric.METRIC_NAME, "Test"); + runner.setProperty(PutCloudWatchMetric.VALUE, "1.0"); + runner.setProperty(PutCloudWatchMetric.CREDENTIALS_FILE, CREDENTIALS_FILE); + + runner.enqueue(new byte[] {}); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutCloudWatchMetric.REL_SUCCESS, 1); + } + + @Test + public void testPublishWithCredentialsProviderService() throws Throwable { + final TestRunner runner = TestRunners.newTestRunner(new PutCloudWatchMetric()); + runner.setValidateExpressionUsage(false); + + final AWSCredentialsProviderControllerService serviceImpl = new AWSCredentialsProviderControllerService(); + runner.addControllerService("awsCredentialsProvider", serviceImpl); + + runner.setProperty(serviceImpl, AbstractAWSCredentialsProviderProcessor.CREDENTIALS_FILE, System.getProperty("user.home") + "/aws-credentials.properties"); + runner.enableControllerService(serviceImpl); + + runner.assertValid(serviceImpl); + + runner.setProperty(PutCloudWatchMetric.NAMESPACE, "Test"); + runner.setProperty(PutCloudWatchMetric.METRIC_NAME, "Test"); + runner.setProperty(PutCloudWatchMetric.VALUE, "1.0"); + runner.setProperty(PutSNS.AWS_CREDENTIALS_PROVIDER_SERVICE, "awsCredentialsProvider"); + + runner.enqueue(new byte[] {}); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutCloudWatchMetric.REL_SUCCESS, 1); + } +} diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/cloudwatch/MockPutCloudWatchMetric.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/cloudwatch/MockPutCloudWatchMetric.java new file mode 100644 index 0000000000..839c4d6bc6 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/cloudwatch/MockPutCloudWatchMetric.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.cloudwatch; + +import java.util.List; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.services.cloudwatch.model.MetricDatum; +import com.amazonaws.services.cloudwatch.model.PutMetricDataRequest; +import com.amazonaws.services.cloudwatch.model.PutMetricDataResult; + + +/** + * Simple mock {@link PutCloudWatchMetric} processor for testing. + */ +public class MockPutCloudWatchMetric extends PutCloudWatchMetric { + + protected String actualNamespace; + protected List actualMetricData; + protected AmazonClientException throwException; + protected PutMetricDataResult result = new PutMetricDataResult(); + protected int putMetricDataCallCount = 0; + + protected PutMetricDataResult putMetricData(PutMetricDataRequest metricDataRequest) throws AmazonClientException { + putMetricDataCallCount++; + actualNamespace = metricDataRequest.getNamespace(); + actualMetricData = metricDataRequest.getMetricData(); + + if (throwException != null) { + throw throwException; + } + + return result; + } +} diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/cloudwatch/TestPutCloudWatchMetric.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/cloudwatch/TestPutCloudWatchMetric.java new file mode 100644 index 0000000000..dbc7f8c2d7 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/cloudwatch/TestPutCloudWatchMetric.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.cloudwatch; + +import java.util.Map; +import java.util.HashMap; + +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; + +import com.amazonaws.services.cloudwatch.model.InvalidParameterValueException; +import com.amazonaws.services.cloudwatch.model.MetricDatum; +import org.junit.Test; +import org.junit.Assert; + +/** + * Unit tests for {@link PutCloudWatchMetric}. + */ +public class TestPutCloudWatchMetric { + + @Test + public void testPutSimpleMetric() throws Exception { + MockPutCloudWatchMetric mockPutCloudWatchMetric = new MockPutCloudWatchMetric(); + final TestRunner runner = TestRunners.newTestRunner(mockPutCloudWatchMetric); + + runner.setProperty(PutCloudWatchMetric.NAMESPACE, "TestNamespace"); + runner.setProperty(PutCloudWatchMetric.METRIC_NAME, "TestMetric"); + runner.setProperty(PutCloudWatchMetric.VALUE, "1.0"); + runner.setProperty(PutCloudWatchMetric.UNIT, "Count"); + runner.setProperty(PutCloudWatchMetric.TIMESTAMP, "1476296132575"); + runner.assertValid(); + + runner.enqueue(new byte[] {}); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutCloudWatchMetric.REL_SUCCESS, 1); + Assert.assertEquals(1, mockPutCloudWatchMetric.putMetricDataCallCount); + Assert.assertEquals("TestNamespace", mockPutCloudWatchMetric.actualNamespace); + MetricDatum datum = mockPutCloudWatchMetric.actualMetricData.get(0); + Assert.assertEquals("TestMetric", datum.getMetricName()); + Assert.assertEquals(1d, datum.getValue(), 0.0001d); + } + + @Test + public void testValueLiteralDoubleInvalid() throws Exception { + MockPutCloudWatchMetric mockPutCloudWatchMetric = new MockPutCloudWatchMetric(); + final TestRunner runner = TestRunners.newTestRunner(mockPutCloudWatchMetric); + + runner.setProperty(PutCloudWatchMetric.NAMESPACE, "TestNamespace"); + runner.setProperty(PutCloudWatchMetric.METRIC_NAME, "TestMetric"); + runner.setProperty(PutCloudWatchMetric.VALUE, "nan"); + runner.assertNotValid(); + } + + @Test + public void testMetricExpressionValid() throws Exception { + MockPutCloudWatchMetric mockPutCloudWatchMetric = new MockPutCloudWatchMetric(); + final TestRunner runner = TestRunners.newTestRunner(mockPutCloudWatchMetric); + + runner.setProperty(PutCloudWatchMetric.NAMESPACE, "TestNamespace"); + runner.setProperty(PutCloudWatchMetric.METRIC_NAME, "TestMetric"); + runner.setProperty(PutCloudWatchMetric.VALUE, "${metric.value}"); + runner.assertValid(); + + final Map attributes = new HashMap<>(); + attributes.put("metric.value", "1.23"); + runner.enqueue(new byte[] {}, attributes); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutCloudWatchMetric.REL_SUCCESS, 1); + Assert.assertEquals(1, mockPutCloudWatchMetric.putMetricDataCallCount); + Assert.assertEquals("TestNamespace", mockPutCloudWatchMetric.actualNamespace); + MetricDatum datum = mockPutCloudWatchMetric.actualMetricData.get(0); + Assert.assertEquals("TestMetric", datum.getMetricName()); + Assert.assertEquals(1.23d, datum.getValue(), 0.0001d); + } + + @Test + public void testMetricExpressionInvalidRoutesToFailure() throws Exception { + MockPutCloudWatchMetric mockPutCloudWatchMetric = new MockPutCloudWatchMetric(); + final TestRunner runner = TestRunners.newTestRunner(mockPutCloudWatchMetric); + + runner.setProperty(PutCloudWatchMetric.NAMESPACE, "TestNamespace"); + runner.setProperty(PutCloudWatchMetric.METRIC_NAME, "TestMetric"); + runner.setProperty(PutCloudWatchMetric.VALUE, "${metric.value}"); + runner.assertValid(); + + final Map attributes = new HashMap<>(); + attributes.put("metric.value", "nan"); + runner.enqueue(new byte[] {}, attributes); + runner.run(); + + Assert.assertEquals(0, mockPutCloudWatchMetric.putMetricDataCallCount); + runner.assertAllFlowFilesTransferred(PutCloudWatchMetric.REL_FAILURE, 1); + } + + @Test + public void testInvalidUnitRoutesToFailure() throws Exception { + MockPutCloudWatchMetric mockPutCloudWatchMetric = new MockPutCloudWatchMetric(); + mockPutCloudWatchMetric.throwException = new InvalidParameterValueException("Unit error message"); + final TestRunner runner = TestRunners.newTestRunner(mockPutCloudWatchMetric); + + runner.setProperty(PutCloudWatchMetric.NAMESPACE, "TestNamespace"); + runner.setProperty(PutCloudWatchMetric.METRIC_NAME, "TestMetric"); + runner.setProperty(PutCloudWatchMetric.UNIT, "BogusUnit"); + runner.setProperty(PutCloudWatchMetric.VALUE, "1"); + runner.assertValid(); + + runner.enqueue(new byte[] {}); + runner.run(); + + Assert.assertEquals(1, mockPutCloudWatchMetric.putMetricDataCallCount); + runner.assertAllFlowFilesTransferred(PutCloudWatchMetric.REL_FAILURE, 1); + } + + @Test + public void testTimestampExpressionInvalidRoutesToFailure() throws Exception { + MockPutCloudWatchMetric mockPutCloudWatchMetric = new MockPutCloudWatchMetric(); + final TestRunner runner = TestRunners.newTestRunner(mockPutCloudWatchMetric); + + runner.setProperty(PutCloudWatchMetric.NAMESPACE, "TestNamespace"); + runner.setProperty(PutCloudWatchMetric.METRIC_NAME, "TestMetric"); + runner.setProperty(PutCloudWatchMetric.UNIT, "Count"); + runner.setProperty(PutCloudWatchMetric.VALUE, "1"); + runner.setProperty(PutCloudWatchMetric.TIMESTAMP, "${timestamp.value}"); + runner.assertValid(); + + final Map attributes = new HashMap<>(); + attributes.put("timestamp.value", "1476296132575broken"); + runner.enqueue(new byte[] {}, attributes); + runner.run(); + + Assert.assertEquals(0, mockPutCloudWatchMetric.putMetricDataCallCount); + runner.assertAllFlowFilesTransferred(PutCloudWatchMetric.REL_FAILURE, 1); + } + +} \ No newline at end of file