diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml index 9492564fe0..b8e83bd076 100644 --- a/nifi-assembly/pom.xml +++ b/nifi-assembly/pom.xml @@ -242,6 +242,11 @@ language governing permissions and limitations under the License. --> nifi-hbase-nar nar + + org.apache.nifi + nifi-riemann-nar + nar + org.apache.nifi nifi-hbase_1_1_2-client-service-nar diff --git a/nifi-nar-bundles/nifi-riemann-bundle/nifi-riemann-nar/pom.xml b/nifi-nar-bundles/nifi-riemann-bundle/nifi-riemann-nar/pom.xml new file mode 100644 index 0000000000..dc7402fe64 --- /dev/null +++ b/nifi-nar-bundles/nifi-riemann-bundle/nifi-riemann-nar/pom.xml @@ -0,0 +1,35 @@ + + + + + nifi-riemann-bundle + org.apache.nifi + 0.4.2-SNAPSHOT + + 4.0.0 + + nifi-riemann-nar + nar + + + org.apache.nifi + nifi-riemann-processors + ${project.version} + + + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-riemann-bundle/nifi-riemann-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-riemann-bundle/nifi-riemann-nar/src/main/resources/META-INF/NOTICE new file mode 100644 index 0000000000..e677fc8483 --- /dev/null +++ b/nifi-nar-bundles/nifi-riemann-bundle/nifi-riemann-nar/src/main/resources/META-INF/NOTICE @@ -0,0 +1,19 @@ +nifi-riemann-nar +Copyright 2014-2015 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +=========================================== +Apache Software License v2 +=========================================== + +The following binary components are provided under the Apache Software License v2 + + (ASLv2) Apache Commons Lang + The following NOTICE information applies: + Apache Commons Lang + Copyright 2001-2015 The Apache Software Foundation + + This product includes software from the Spring Framework, + under the Apache License 2.0 (see: StringUtils.containsWhitespace()) \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-riemann-bundle/nifi-riemann-processors/pom.xml b/nifi-nar-bundles/nifi-riemann-bundle/nifi-riemann-processors/pom.xml new file mode 100644 index 0000000000..76869ef297 --- /dev/null +++ b/nifi-nar-bundles/nifi-riemann-bundle/nifi-riemann-processors/pom.xml @@ -0,0 +1,53 @@ + + + + + org.apache.nifi + nifi-riemann-bundle + 0.4.2-SNAPSHOT + + 4.0.0 + + nifi-riemann-processors + jar + + + + org.apache.nifi + nifi-api + + + org.apache.nifi + nifi-utils + + + org.apache.nifi + nifi-processor-utils + + + org.apache.nifi + nifi-properties + + + org.apache.nifi + nifi-mock + test + + + + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-riemann-bundle/nifi-riemann-processors/src/main/java/org/apache/nifi/processors/riemann/PutRiemann.java b/nifi-nar-bundles/nifi-riemann-bundle/nifi-riemann-processors/src/main/java/org/apache/nifi/processors/riemann/PutRiemann.java new file mode 100644 index 0000000000..6ba2ae0a27 --- /dev/null +++ b/nifi-nar-bundles/nifi-riemann-bundle/nifi-riemann-processors/src/main/java/org/apache/nifi/processors/riemann/PutRiemann.java @@ -0,0 +1,384 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.riemann; + +import com.aphyr.riemann.Proto; +import com.aphyr.riemann.Proto.Event; +import com.aphyr.riemann.client.RiemannClient; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.Validator; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +@Tags({"riemann", "monitoring", "metrics"}) +@DynamicProperty(name = "Custom Event Attribute", supportsExpressionLanguage = true, + description = "These values will be attached to the Riemann event as a custom attribute", + value = "Any value or expression") +@CapabilityDescription("Send events to Riemann (http://riemann.io) when FlowFiles pass through this processor. " + + "You can use events to notify Riemann that a FlowFile passed through, or you can attach a more " + + "meaningful metric, such as, the time a FlowFile took to get to this processor. All attributes attached to " + + "events support the NiFi Expression Language.") +@SupportsBatching +@InputRequirement(Requirement.INPUT_REQUIRED) +public class PutRiemann extends AbstractProcessor { + protected enum Transport { + TCP, UDP + } + + protected volatile RiemannClient riemannClient = null; + protected volatile Transport transport; + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("Metrics successfully written to Riemann") + .build(); + + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("Metrics which failed to write to Riemann") + .build(); + + + public static final PropertyDescriptor RIEMANN_HOST = new PropertyDescriptor.Builder() + .name("Riemann Address") + .description("Hostname of Riemann server") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor RIEMANN_PORT = new PropertyDescriptor.Builder() + .name("Riemann Port") + .description("Port that Riemann is listening on") + .required(true) + .defaultValue("5555") + .addValidator(StandardValidators.PORT_VALIDATOR) + .build(); + + public static final PropertyDescriptor TRANSPORT_PROTOCOL = new PropertyDescriptor.Builder() + .name("Transport Protocol") + .description("Transport protocol to speak to Riemann in") + .required(true) + .allowableValues(new Transport[]{Transport.TCP, Transport.UDP}) + .defaultValue("TCP") + .build(); + + public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() + .name("Batch Size") + .description("Batch size for incoming FlowFiles") + .required(false) + .defaultValue("100") + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .build(); + + // Attributes Mappings + public static final PropertyDescriptor ATTR_SERVICE = new PropertyDescriptor.Builder() + .name("Service") + .description("Name of service associated to this event (e.g. FTP File Fetched)") + .required(false) + .expressionLanguageSupported(true) + .addValidator(Validator.VALID) + .build(); + + public static final PropertyDescriptor ATTR_STATE = new PropertyDescriptor.Builder() + .name("State") + .description("State of service associated to this event in string form (e.g. ok, warning, foo)") + .required(false) + .expressionLanguageSupported(true) + .addValidator(Validator.VALID) + .build(); + + public static final PropertyDescriptor ATTR_TIME = new PropertyDescriptor.Builder() + .name("Time") + .description("Time of event in unix epoch seconds (long), default: (current time)") + .required(false) + .expressionLanguageSupported(true) + .addValidator(Validator.VALID) + .expressionLanguageSupported(true) + .build(); + + public static final PropertyDescriptor ATTR_HOST = new PropertyDescriptor.Builder() + .name("Host") + .description("A hostname associated to this event (e.g. nifi-app1)") + .required(false) + .defaultValue("${hostname()}") + .expressionLanguageSupported(true) + .addValidator(Validator.VALID) + .build(); + + public static final PropertyDescriptor ATTR_TTL = new PropertyDescriptor.Builder() + .name("TTL") + .description("Floating point value in seconds until Riemann considers this event as \"expired\"") + .required(false) + .addValidator(Validator.VALID) + .expressionLanguageSupported(true) + .build(); + + public static final PropertyDescriptor ATTR_METRIC = new PropertyDescriptor.Builder() + .name("Metric") + .description("Floating point number associated to this event") + .required(false) + .addValidator(Validator.VALID) + .expressionLanguageSupported(true) + .build(); + + public static final PropertyDescriptor ATTR_DESCRIPTION = new PropertyDescriptor.Builder() + .name("Description") + .description("Description associated to the event") + .required(false) + .expressionLanguageSupported(true) + .addValidator(Validator.VALID) + .build(); + + + public static final PropertyDescriptor ATTR_TAGS = new PropertyDescriptor.Builder() + .name("Tags") + .description("Comma separated list of tags associated to the event") + .required(false) + .expressionLanguageSupported(true) + .addValidator(Validator.VALID) + .build(); + + public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder() + .name("Timeout") + .description("Timeout in milliseconds when writing events to Riemann") + .required(true) + .defaultValue("1000") + .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR) + .build(); + + private volatile List customAttributes = new ArrayList<>(); + private static final Set RELATIONSHIPS = new HashSet<>(); + private static final List LOCAL_PROPERTIES = new ArrayList<>(); + + private volatile int batchSize = -1; + private volatile long writeTimeout = 1000; + + static { + RELATIONSHIPS.add(REL_SUCCESS); + RELATIONSHIPS.add(REL_FAILURE); + LOCAL_PROPERTIES.add(RIEMANN_HOST); + LOCAL_PROPERTIES.add(RIEMANN_PORT); + LOCAL_PROPERTIES.add(TRANSPORT_PROTOCOL); + LOCAL_PROPERTIES.add(TIMEOUT); + LOCAL_PROPERTIES.add(BATCH_SIZE); + LOCAL_PROPERTIES.add(ATTR_DESCRIPTION); + LOCAL_PROPERTIES.add(ATTR_SERVICE); + LOCAL_PROPERTIES.add(ATTR_STATE); + LOCAL_PROPERTIES.add(ATTR_METRIC); + LOCAL_PROPERTIES.add(ATTR_TTL); + LOCAL_PROPERTIES.add(ATTR_TAGS); + LOCAL_PROPERTIES.add(ATTR_HOST); + LOCAL_PROPERTIES.add(ATTR_TIME); + } + + + @Override + public Set getRelationships() { + return RELATIONSHIPS; + } + + @Override + protected List getSupportedPropertyDescriptors() { + return LOCAL_PROPERTIES; + } + + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .name(propertyDescriptorName) + .expressionLanguageSupported(true) + .addValidator(Validator.VALID) + .required(false) + .dynamic(true) + .build(); + } + + @OnStopped + public final void cleanUpClient() { + if (riemannClient != null) { + this.riemannClient.close(); + } + this.riemannClient = null; + this.batchSize = -1; + this.customAttributes.clear(); + } + + @OnScheduled + public void onScheduled(ProcessContext context) throws ProcessException { + if (batchSize == -1) { + batchSize = context.getProperty(BATCH_SIZE).asInteger(); + } + if (riemannClient == null || !riemannClient.isConnected()) { + transport = Transport.valueOf(context.getProperty(TRANSPORT_PROTOCOL).getValue()); + String host = context.getProperty(RIEMANN_HOST).getValue().trim(); + int port = context.getProperty(RIEMANN_PORT).asInteger(); + writeTimeout = context.getProperty(TIMEOUT).asLong(); + RiemannClient client = null; + try { + switch (transport) { + case TCP: + client = RiemannClient.tcp(host, port); + break; + case UDP: + client = RiemannClient.udp(host, port); + break; + } + client.connect(); + riemannClient = client; + } catch (IOException e) { + if (client != null) { + client.close(); + } + context.yield(); + throw new ProcessException(String.format("Unable to connect to Riemann [%s:%d] (%s)\n%s", host, port, transport, e.getMessage())); + } + } + + if (customAttributes.size() == 0) { + for (Map.Entry property : context.getProperties().entrySet()) { + // only custom defined properties + if (!getSupportedPropertyDescriptors().contains(property.getKey())) { + customAttributes.add(property.getKey()); + } + } + } + } + + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + // Check if the client is currently connected, as a previous trigger could have detected a failure + // in the connection. + if (riemannClient == null || !riemannClient.isConnected()) { + // clean up the client and attempt to re-initialize the processor + cleanUpClient(); + onScheduled(context); + } + + List incomingFlowFiles = session.get(batchSize); + List successfulFlowFiles = new ArrayList<>(incomingFlowFiles.size()); + List eventsQueue = new ArrayList<>(incomingFlowFiles.size()); + for (FlowFile flowFile : incomingFlowFiles) { + try { + eventsQueue.add(FlowFileToEvent.fromAttributes(context, customAttributes, flowFile)); + successfulFlowFiles.add(flowFile); + } catch (NumberFormatException e) { + getLogger().warn("Unable to create Riemann event.", e); + session.transfer(flowFile, REL_FAILURE); + } + } + try { + if (transport == Transport.TCP) { + Proto.Msg returnMessage = riemannClient.sendEvents(eventsQueue).deref(writeTimeout, TimeUnit.MILLISECONDS); + if (returnMessage == null) { + context.yield(); + throw new ProcessException("Timed out writing to Riemann!"); + } + } else { + riemannClient.sendEvents(eventsQueue); + } + riemannClient.flush(); + session.transfer(successfulFlowFiles, REL_SUCCESS); + session.commit(); + } catch (Exception e) { + context.yield(); + session.transfer(incomingFlowFiles); + session.commit(); + throw new ProcessException("Failed writing to Riemann\n" + e.getMessage()); + } + } + + /** + * Converts a FlowFile into a Riemann Protobuf Event + */ + private static class FlowFileToEvent { + protected static Event fromAttributes(ProcessContext context, List customProperties, + FlowFile flowFile) { + Event.Builder builder = Event.newBuilder(); + + PropertyValue service = context.getProperty(ATTR_SERVICE).evaluateAttributeExpressions(flowFile); + if (StringUtils.isNotBlank(service.getValue())) { + builder.setService(service.getValue()); + } + PropertyValue description = context.getProperty(ATTR_DESCRIPTION).evaluateAttributeExpressions(flowFile); + if (StringUtils.isNotBlank(description.getValue())) { + builder.setDescription(description.getValue()); + } + PropertyValue metric = context.getProperty(ATTR_METRIC).evaluateAttributeExpressions(flowFile); + if (StringUtils.isNotBlank(metric.getValue())) { + builder.setMetricF(metric.asFloat()); + } + PropertyValue time = context.getProperty(ATTR_TIME).evaluateAttributeExpressions(flowFile); + if (StringUtils.isNotBlank(time.getValue())) { + builder.setTime(time.asLong()); + } + PropertyValue state = context.getProperty(ATTR_STATE).evaluateAttributeExpressions(flowFile); + if (StringUtils.isNotBlank(state.getValue())) { + builder.setState(state.getValue()); + } + PropertyValue ttl = context.getProperty(ATTR_TTL).evaluateAttributeExpressions(flowFile); + if (StringUtils.isNotBlank(ttl.getValue())) { + builder.setTtl(ttl.asFloat()); + } + PropertyValue host = context.getProperty(ATTR_HOST).evaluateAttributeExpressions(flowFile); + if (StringUtils.isNotBlank(host.getValue())) { + builder.setHost(host.getValue()); + } + PropertyValue tags = context.getProperty(ATTR_TAGS).evaluateAttributeExpressions(flowFile); + if (StringUtils.isNotBlank(tags.getValue())) { + String[] splitTags = tags.getValue().split(","); + for (String splitTag : splitTags) { + builder.addTags(splitTag.trim()); + } + } + PropertyValue customAttributeValue; + for (PropertyDescriptor customProperty : customProperties) { + customAttributeValue = context.getProperty(customProperty).evaluateAttributeExpressions(flowFile); + if (StringUtils.isNotBlank(customAttributeValue.getValue())) { + builder.addAttributes(Proto.Attribute.newBuilder() + .setKey(customProperty.getName()) + .setValue(customAttributeValue.getValue()) + .build()); + } + } + return builder.build(); + } + } +} diff --git a/nifi-nar-bundles/nifi-riemann-bundle/nifi-riemann-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-riemann-bundle/nifi-riemann-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor new file mode 100644 index 0000000000..c53cea64f2 --- /dev/null +++ b/nifi-nar-bundles/nifi-riemann-bundle/nifi-riemann-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -0,0 +1,15 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.processors.riemann.PutRiemann diff --git a/nifi-nar-bundles/nifi-riemann-bundle/nifi-riemann-processors/src/test/java/org/apache/nifi/processors/riemann/TestPutRiemann.java b/nifi-nar-bundles/nifi-riemann-bundle/nifi-riemann-processors/src/test/java/org/apache/nifi/processors/riemann/TestPutRiemann.java new file mode 100644 index 0000000000..502e583684 --- /dev/null +++ b/nifi-nar-bundles/nifi-riemann-bundle/nifi-riemann-processors/src/test/java/org/apache/nifi/processors/riemann/TestPutRiemann.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.riemann; + +import com.aphyr.riemann.Proto; +import com.aphyr.riemann.client.IPromise; +import com.aphyr.riemann.client.RiemannClient; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyInt; +import static org.mockito.Mockito.anyListOf; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestPutRiemann { + @Rule + public final ExpectedException expectedException = ExpectedException.none(); + + // Holds incoming events to Riemann + private Queue eventStream = new LinkedList(); + + @Before + public void clearEventStream() { + eventStream.clear(); + } + + private TestRunner getTestRunner() { + return getTestRunner(false); + } + + private TestRunner getTestRunner(final boolean failOnWrite) { + RiemannClient riemannClient = mock(RiemannClient.class); + when(riemannClient.sendEvents(anyListOf(Proto.Event.class))).thenAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocationOnMock) throws Throwable { + List events = (List) invocationOnMock.getArguments()[0]; + for (Proto.Event event : events) { + eventStream.add(event); + } + IPromise iPromise = mock(IPromise.class); + if (!failOnWrite) { + when(iPromise.deref(anyInt(), any(TimeUnit.class))).thenReturn(Proto.Msg.getDefaultInstance()); + } else { + when(iPromise.deref(anyInt(), any(TimeUnit.class))).thenReturn(null); + } + return iPromise; + } + }); + when(riemannClient.isConnected()).thenReturn(true); + PutRiemann riemannProcessor = new PutRiemann(); + riemannProcessor.riemannClient = riemannClient; + riemannProcessor.transport = PutRiemann.Transport.TCP; + + TestRunner runner = TestRunners.newTestRunner(riemannProcessor); + runner.setProperty(PutRiemann.RIEMANN_HOST, "localhost"); + runner.setProperty(PutRiemann.RIEMANN_PORT, "5555"); + runner.setProperty(PutRiemann.TRANSPORT_PROTOCOL, "TCP"); + runner.setProperty(PutRiemann.BATCH_SIZE, "100"); + runner.setProperty(PutRiemann.ATTR_SERVICE, "nifi-test-service"); + runner.setProperty(PutRiemann.ATTR_HOST, "${riemann.host}"); + runner.setProperty(PutRiemann.ATTR_TTL, "5"); + runner.setProperty(PutRiemann.ATTR_DESCRIPTION, "test"); + runner.setProperty(PutRiemann.ATTR_TAGS, "tag1, tag2, tag3"); + runner.setProperty(PutRiemann.ATTR_METRIC, "${riemann.metric}"); + runner.setProperty("custom-attribute-1", "${custom.attribute.1}"); + runner.setProperty("custom-attribute-2", "${custom.attribute.2}"); + runner.setProperty("custom-attribute-3", "${custom.attribute.3}"); + return runner; + } + + + @Test + public void testBasicEvent() { + TestRunner runner = getTestRunner(); + Map attributes = new HashMap<>(); + attributes.put("riemann.metric", "42"); + attributes.put("riemann.host", "basic-host"); + MockFlowFile flowFile = new MockFlowFile(1); + flowFile.putAttributes(attributes); + runner.enqueue(flowFile); + runner.run(); + runner.assertAllFlowFilesTransferred(PutRiemann.REL_SUCCESS); + + Proto.Event event = eventStream.remove(); + assertEquals("nifi-test-service", event.getService()); + assertTrue(5.0 == event.getTtl()); + assertTrue(42.0 == event.getMetricF()); + assertEquals("basic-host", event.getHost()); + assertEquals("test", event.getDescription()); + assertEquals(3, event.getTagsCount()); + assertTrue(event.getTagsList().contains("tag1")); + assertTrue(event.getTagsList().contains("tag2")); + assertTrue(event.getTagsList().contains("tag3")); + assertEquals(0, event.getAttributesCount()); + } + + @Test + public void testBatchedEvents() { + // (2 batches) + (1 remaining event) + int iterations = Integer.parseInt(PutRiemann.BATCH_SIZE.getDefaultValue()) * 2 + 1; + TestRunner runner = getTestRunner(); + + for (int i = 0; i < iterations; i++) { + Map attributes = new HashMap<>(); + attributes.put("riemann.metric", Float.toString(i)); + attributes.put("riemann.host", "batch-host"); + attributes.put("custom.attribute.1", "attr1"); + attributes.put("custom.attribute.2", "attr2"); + attributes.put("custom.attribute.3", "attr3"); + MockFlowFile flowFile = new MockFlowFile(i); + flowFile.putAttributes(attributes); + runner.enqueue(flowFile); + } + runner.run(3); + runner.assertAllFlowFilesTransferred(PutRiemann.REL_SUCCESS); + + for (int i = 0; i < iterations; i++) { + Proto.Event event = eventStream.remove(); + assertEquals("nifi-test-service", event.getService()); + assertTrue(5.0 == event.getTtl()); + assertTrue(i == event.getMetricF()); + assertEquals("batch-host", event.getHost()); + assertEquals("test", event.getDescription()); + assertEquals(3, event.getTagsCount()); + assertEquals(3, event.getAttributesCount()); + assertTrue(event.getTagsList().contains("tag1")); + assertTrue(event.getTagsList().contains("tag2")); + assertTrue(event.getTagsList().contains("tag3")); + } + } + + @Test + public void testInvalidEvents() { + TestRunner runner = getTestRunner(); + MockFlowFile flowFile = new MockFlowFile(1); + Map attributes = new HashMap<>(); + attributes.put("riemann.metric", "NOT A NUMBER"); + flowFile.putAttributes(attributes); + runner.enqueue(flowFile); + runner.run(); + runner.assertAllFlowFilesTransferred(PutRiemann.REL_FAILURE); + } + + + @Test(expected = AssertionError.class) + public void testFailedDeref() { + TestRunner runner = getTestRunner(true); + MockFlowFile flowFile = new MockFlowFile(1); + Map attributes = new HashMap<>(); + attributes.put("riemann.metric", "5"); + flowFile.putAttributes(attributes); + runner.enqueue(flowFile); + try { + runner.run(); + } catch (ProcessException e) { + runner.assertQueueNotEmpty(); + throw e; + } + } +} diff --git a/nifi-nar-bundles/nifi-riemann-bundle/pom.xml b/nifi-nar-bundles/nifi-riemann-bundle/pom.xml new file mode 100644 index 0000000000..d664e64469 --- /dev/null +++ b/nifi-nar-bundles/nifi-riemann-bundle/pom.xml @@ -0,0 +1,53 @@ + + + + + nifi-nar-bundles + org.apache.nifi + 0.4.2-SNAPSHOT + + 4.0.0 + + nifi-riemann-bundle + pom + + nifi-riemann-processors + nifi-riemann-nar + + + + clojars.org + http://clojars.org/repo + + + + + com.aphyr + riemann-java-client + 0.4.0 + + + + + + org.apache.nifi + nifi-riemann-processors + + + + \ No newline at end of file diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml index b2f637c782..96ab012dd9 100644 --- a/nifi-nar-bundles/pom.xml +++ b/nifi-nar-bundles/pom.xml @@ -42,13 +42,14 @@ nifi-language-translation-bundle nifi-mongodb-bundle nifi-flume-bundle - nifi-hbase-bundle + nifi-hbase-bundle nifi-ambari-bundle nifi-image-bundle nifi-avro-bundle nifi-couchbase-bundle nifi-azure-bundle nifi-ldap-iaa-providers-bundle + nifi-riemann-bundle diff --git a/pom.xml b/pom.xml index 2867abe3c4..2af004d6d3 100644 --- a/pom.xml +++ b/pom.xml @@ -980,6 +980,12 @@ language governing permissions and limitations under the License. --> 0.4.2-SNAPSHOT nar + + org.apache.nifi + nifi-riemann-nar + 0.4.2-SNAPSHOT + nar + org.apache.nifi nifi-hbase_1_1_2-client-service-nar