NIFI-987 Added Riemann (PutRiemann) Reporting

- Introduced nifi-riemann-bundle for future Riemann backed monitoring

- Added initial PutRiemann processor for writing events to Riemann
  using the Riemann batch client.

	- Values for events are provided using the NiFi expression language
		e.g. Metric -> ${latency.milliseconds:divide(1000)}
This commit is contained in:
ricky 2015-09-22 10:45:10 -04:00
parent ebcefaac23
commit c45060f703
10 changed files with 765 additions and 1 deletions

View File

@ -242,6 +242,11 @@ language governing permissions and limitations under the License. -->
<artifactId>nifi-hbase-nar</artifactId> <artifactId>nifi-hbase-nar</artifactId>
<type>nar</type> <type>nar</type>
</dependency> </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-riemann-nar</artifactId>
<type>nar</type>
</dependency>
<dependency> <dependency>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-hbase_1_1_2-client-service-nar</artifactId> <artifactId>nifi-hbase_1_1_2-client-service-nar</artifactId>

View File

@ -0,0 +1,35 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>nifi-riemann-bundle</artifactId>
<groupId>org.apache.nifi</groupId>
<version>0.4.2-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>nifi-riemann-nar</artifactId>
<packaging>nar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-riemann-processors</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,19 @@
nifi-riemann-nar
Copyright 2014-2015 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
===========================================
Apache Software License v2
===========================================
The following binary components are provided under the Apache Software License v2
(ASLv2) Apache Commons Lang
The following NOTICE information applies:
Apache Commons Lang
Copyright 2001-2015 The Apache Software Foundation
This product includes software from the Spring Framework,
under the Apache License 2.0 (see: StringUtils.containsWhitespace())

View File

@ -0,0 +1,53 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-riemann-bundle</artifactId>
<version>0.4.2-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>nifi-riemann-processors</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-properties</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,384 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.riemann;
import com.aphyr.riemann.Proto;
import com.aphyr.riemann.Proto.Event;
import com.aphyr.riemann.client.RiemannClient;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.Validator;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@Tags({"riemann", "monitoring", "metrics"})
@DynamicProperty(name = "Custom Event Attribute", supportsExpressionLanguage = true,
description = "These values will be attached to the Riemann event as a custom attribute",
value = "Any value or expression")
@CapabilityDescription("Send events to Riemann (http://riemann.io) when FlowFiles pass through this processor. " +
"You can use events to notify Riemann that a FlowFile passed through, or you can attach a more " +
"meaningful metric, such as, the time a FlowFile took to get to this processor. All attributes attached to " +
"events support the NiFi Expression Language.")
@SupportsBatching
@InputRequirement(Requirement.INPUT_REQUIRED)
public class PutRiemann extends AbstractProcessor {
protected enum Transport {
TCP, UDP
}
protected volatile RiemannClient riemannClient = null;
protected volatile Transport transport;
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("Metrics successfully written to Riemann")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("Metrics which failed to write to Riemann")
.build();
public static final PropertyDescriptor RIEMANN_HOST = new PropertyDescriptor.Builder()
.name("Riemann Address")
.description("Hostname of Riemann server")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor RIEMANN_PORT = new PropertyDescriptor.Builder()
.name("Riemann Port")
.description("Port that Riemann is listening on")
.required(true)
.defaultValue("5555")
.addValidator(StandardValidators.PORT_VALIDATOR)
.build();
public static final PropertyDescriptor TRANSPORT_PROTOCOL = new PropertyDescriptor.Builder()
.name("Transport Protocol")
.description("Transport protocol to speak to Riemann in")
.required(true)
.allowableValues(new Transport[]{Transport.TCP, Transport.UDP})
.defaultValue("TCP")
.build();
public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("Batch Size")
.description("Batch size for incoming FlowFiles")
.required(false)
.defaultValue("100")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build();
// Attributes Mappings
public static final PropertyDescriptor ATTR_SERVICE = new PropertyDescriptor.Builder()
.name("Service")
.description("Name of service associated to this event (e.g. FTP File Fetched)")
.required(false)
.expressionLanguageSupported(true)
.addValidator(Validator.VALID)
.build();
public static final PropertyDescriptor ATTR_STATE = new PropertyDescriptor.Builder()
.name("State")
.description("State of service associated to this event in string form (e.g. ok, warning, foo)")
.required(false)
.expressionLanguageSupported(true)
.addValidator(Validator.VALID)
.build();
public static final PropertyDescriptor ATTR_TIME = new PropertyDescriptor.Builder()
.name("Time")
.description("Time of event in unix epoch seconds (long), default: (current time)")
.required(false)
.expressionLanguageSupported(true)
.addValidator(Validator.VALID)
.expressionLanguageSupported(true)
.build();
public static final PropertyDescriptor ATTR_HOST = new PropertyDescriptor.Builder()
.name("Host")
.description("A hostname associated to this event (e.g. nifi-app1)")
.required(false)
.defaultValue("${hostname()}")
.expressionLanguageSupported(true)
.addValidator(Validator.VALID)
.build();
public static final PropertyDescriptor ATTR_TTL = new PropertyDescriptor.Builder()
.name("TTL")
.description("Floating point value in seconds until Riemann considers this event as \"expired\"")
.required(false)
.addValidator(Validator.VALID)
.expressionLanguageSupported(true)
.build();
public static final PropertyDescriptor ATTR_METRIC = new PropertyDescriptor.Builder()
.name("Metric")
.description("Floating point number associated to this event")
.required(false)
.addValidator(Validator.VALID)
.expressionLanguageSupported(true)
.build();
public static final PropertyDescriptor ATTR_DESCRIPTION = new PropertyDescriptor.Builder()
.name("Description")
.description("Description associated to the event")
.required(false)
.expressionLanguageSupported(true)
.addValidator(Validator.VALID)
.build();
public static final PropertyDescriptor ATTR_TAGS = new PropertyDescriptor.Builder()
.name("Tags")
.description("Comma separated list of tags associated to the event")
.required(false)
.expressionLanguageSupported(true)
.addValidator(Validator.VALID)
.build();
public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder()
.name("Timeout")
.description("Timeout in milliseconds when writing events to Riemann")
.required(true)
.defaultValue("1000")
.addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
.build();
private volatile List<PropertyDescriptor> customAttributes = new ArrayList<>();
private static final Set<Relationship> RELATIONSHIPS = new HashSet<>();
private static final List<PropertyDescriptor> LOCAL_PROPERTIES = new ArrayList<>();
private volatile int batchSize = -1;
private volatile long writeTimeout = 1000;
static {
RELATIONSHIPS.add(REL_SUCCESS);
RELATIONSHIPS.add(REL_FAILURE);
LOCAL_PROPERTIES.add(RIEMANN_HOST);
LOCAL_PROPERTIES.add(RIEMANN_PORT);
LOCAL_PROPERTIES.add(TRANSPORT_PROTOCOL);
LOCAL_PROPERTIES.add(TIMEOUT);
LOCAL_PROPERTIES.add(BATCH_SIZE);
LOCAL_PROPERTIES.add(ATTR_DESCRIPTION);
LOCAL_PROPERTIES.add(ATTR_SERVICE);
LOCAL_PROPERTIES.add(ATTR_STATE);
LOCAL_PROPERTIES.add(ATTR_METRIC);
LOCAL_PROPERTIES.add(ATTR_TTL);
LOCAL_PROPERTIES.add(ATTR_TAGS);
LOCAL_PROPERTIES.add(ATTR_HOST);
LOCAL_PROPERTIES.add(ATTR_TIME);
}
@Override
public Set<Relationship> getRelationships() {
return RELATIONSHIPS;
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return LOCAL_PROPERTIES;
}
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
.name(propertyDescriptorName)
.expressionLanguageSupported(true)
.addValidator(Validator.VALID)
.required(false)
.dynamic(true)
.build();
}
@OnStopped
public final void cleanUpClient() {
if (riemannClient != null) {
this.riemannClient.close();
}
this.riemannClient = null;
this.batchSize = -1;
this.customAttributes.clear();
}
@OnScheduled
public void onScheduled(ProcessContext context) throws ProcessException {
if (batchSize == -1) {
batchSize = context.getProperty(BATCH_SIZE).asInteger();
}
if (riemannClient == null || !riemannClient.isConnected()) {
transport = Transport.valueOf(context.getProperty(TRANSPORT_PROTOCOL).getValue());
String host = context.getProperty(RIEMANN_HOST).getValue().trim();
int port = context.getProperty(RIEMANN_PORT).asInteger();
writeTimeout = context.getProperty(TIMEOUT).asLong();
RiemannClient client = null;
try {
switch (transport) {
case TCP:
client = RiemannClient.tcp(host, port);
break;
case UDP:
client = RiemannClient.udp(host, port);
break;
}
client.connect();
riemannClient = client;
} catch (IOException e) {
if (client != null) {
client.close();
}
context.yield();
throw new ProcessException(String.format("Unable to connect to Riemann [%s:%d] (%s)\n%s", host, port, transport, e.getMessage()));
}
}
if (customAttributes.size() == 0) {
for (Map.Entry<PropertyDescriptor, String> property : context.getProperties().entrySet()) {
// only custom defined properties
if (!getSupportedPropertyDescriptors().contains(property.getKey())) {
customAttributes.add(property.getKey());
}
}
}
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
// Check if the client is currently connected, as a previous trigger could have detected a failure
// in the connection.
if (riemannClient == null || !riemannClient.isConnected()) {
// clean up the client and attempt to re-initialize the processor
cleanUpClient();
onScheduled(context);
}
List<FlowFile> incomingFlowFiles = session.get(batchSize);
List<FlowFile> successfulFlowFiles = new ArrayList<>(incomingFlowFiles.size());
List<Event> eventsQueue = new ArrayList<>(incomingFlowFiles.size());
for (FlowFile flowFile : incomingFlowFiles) {
try {
eventsQueue.add(FlowFileToEvent.fromAttributes(context, customAttributes, flowFile));
successfulFlowFiles.add(flowFile);
} catch (NumberFormatException e) {
getLogger().warn("Unable to create Riemann event.", e);
session.transfer(flowFile, REL_FAILURE);
}
}
try {
if (transport == Transport.TCP) {
Proto.Msg returnMessage = riemannClient.sendEvents(eventsQueue).deref(writeTimeout, TimeUnit.MILLISECONDS);
if (returnMessage == null) {
context.yield();
throw new ProcessException("Timed out writing to Riemann!");
}
} else {
riemannClient.sendEvents(eventsQueue);
}
riemannClient.flush();
session.transfer(successfulFlowFiles, REL_SUCCESS);
session.commit();
} catch (Exception e) {
context.yield();
session.transfer(incomingFlowFiles);
session.commit();
throw new ProcessException("Failed writing to Riemann\n" + e.getMessage());
}
}
/**
* Converts a FlowFile into a Riemann Protobuf Event
*/
private static class FlowFileToEvent {
protected static Event fromAttributes(ProcessContext context, List<PropertyDescriptor> customProperties,
FlowFile flowFile) {
Event.Builder builder = Event.newBuilder();
PropertyValue service = context.getProperty(ATTR_SERVICE).evaluateAttributeExpressions(flowFile);
if (StringUtils.isNotBlank(service.getValue())) {
builder.setService(service.getValue());
}
PropertyValue description = context.getProperty(ATTR_DESCRIPTION).evaluateAttributeExpressions(flowFile);
if (StringUtils.isNotBlank(description.getValue())) {
builder.setDescription(description.getValue());
}
PropertyValue metric = context.getProperty(ATTR_METRIC).evaluateAttributeExpressions(flowFile);
if (StringUtils.isNotBlank(metric.getValue())) {
builder.setMetricF(metric.asFloat());
}
PropertyValue time = context.getProperty(ATTR_TIME).evaluateAttributeExpressions(flowFile);
if (StringUtils.isNotBlank(time.getValue())) {
builder.setTime(time.asLong());
}
PropertyValue state = context.getProperty(ATTR_STATE).evaluateAttributeExpressions(flowFile);
if (StringUtils.isNotBlank(state.getValue())) {
builder.setState(state.getValue());
}
PropertyValue ttl = context.getProperty(ATTR_TTL).evaluateAttributeExpressions(flowFile);
if (StringUtils.isNotBlank(ttl.getValue())) {
builder.setTtl(ttl.asFloat());
}
PropertyValue host = context.getProperty(ATTR_HOST).evaluateAttributeExpressions(flowFile);
if (StringUtils.isNotBlank(host.getValue())) {
builder.setHost(host.getValue());
}
PropertyValue tags = context.getProperty(ATTR_TAGS).evaluateAttributeExpressions(flowFile);
if (StringUtils.isNotBlank(tags.getValue())) {
String[] splitTags = tags.getValue().split(",");
for (String splitTag : splitTags) {
builder.addTags(splitTag.trim());
}
}
PropertyValue customAttributeValue;
for (PropertyDescriptor customProperty : customProperties) {
customAttributeValue = context.getProperty(customProperty).evaluateAttributeExpressions(flowFile);
if (StringUtils.isNotBlank(customAttributeValue.getValue())) {
builder.addAttributes(Proto.Attribute.newBuilder()
.setKey(customProperty.getName())
.setValue(customAttributeValue.getValue())
.build());
}
}
return builder.build();
}
}
}

View File

@ -0,0 +1,15 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.processors.riemann.PutRiemann

View File

@ -0,0 +1,193 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.riemann;
import com.aphyr.riemann.Proto;
import com.aphyr.riemann.client.IPromise;
import com.aphyr.riemann.client.RiemannClient;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyInt;
import static org.mockito.Mockito.anyListOf;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestPutRiemann {
@Rule
public final ExpectedException expectedException = ExpectedException.none();
// Holds incoming events to Riemann
private Queue<Proto.Event> eventStream = new LinkedList<Proto.Event>();
@Before
public void clearEventStream() {
eventStream.clear();
}
private TestRunner getTestRunner() {
return getTestRunner(false);
}
private TestRunner getTestRunner(final boolean failOnWrite) {
RiemannClient riemannClient = mock(RiemannClient.class);
when(riemannClient.sendEvents(anyListOf(Proto.Event.class))).thenAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
List<Proto.Event> events = (List<Proto.Event>) invocationOnMock.getArguments()[0];
for (Proto.Event event : events) {
eventStream.add(event);
}
IPromise iPromise = mock(IPromise.class);
if (!failOnWrite) {
when(iPromise.deref(anyInt(), any(TimeUnit.class))).thenReturn(Proto.Msg.getDefaultInstance());
} else {
when(iPromise.deref(anyInt(), any(TimeUnit.class))).thenReturn(null);
}
return iPromise;
}
});
when(riemannClient.isConnected()).thenReturn(true);
PutRiemann riemannProcessor = new PutRiemann();
riemannProcessor.riemannClient = riemannClient;
riemannProcessor.transport = PutRiemann.Transport.TCP;
TestRunner runner = TestRunners.newTestRunner(riemannProcessor);
runner.setProperty(PutRiemann.RIEMANN_HOST, "localhost");
runner.setProperty(PutRiemann.RIEMANN_PORT, "5555");
runner.setProperty(PutRiemann.TRANSPORT_PROTOCOL, "TCP");
runner.setProperty(PutRiemann.BATCH_SIZE, "100");
runner.setProperty(PutRiemann.ATTR_SERVICE, "nifi-test-service");
runner.setProperty(PutRiemann.ATTR_HOST, "${riemann.host}");
runner.setProperty(PutRiemann.ATTR_TTL, "5");
runner.setProperty(PutRiemann.ATTR_DESCRIPTION, "test");
runner.setProperty(PutRiemann.ATTR_TAGS, "tag1, tag2, tag3");
runner.setProperty(PutRiemann.ATTR_METRIC, "${riemann.metric}");
runner.setProperty("custom-attribute-1", "${custom.attribute.1}");
runner.setProperty("custom-attribute-2", "${custom.attribute.2}");
runner.setProperty("custom-attribute-3", "${custom.attribute.3}");
return runner;
}
@Test
public void testBasicEvent() {
TestRunner runner = getTestRunner();
Map<String, String> attributes = new HashMap<>();
attributes.put("riemann.metric", "42");
attributes.put("riemann.host", "basic-host");
MockFlowFile flowFile = new MockFlowFile(1);
flowFile.putAttributes(attributes);
runner.enqueue(flowFile);
runner.run();
runner.assertAllFlowFilesTransferred(PutRiemann.REL_SUCCESS);
Proto.Event event = eventStream.remove();
assertEquals("nifi-test-service", event.getService());
assertTrue(5.0 == event.getTtl());
assertTrue(42.0 == event.getMetricF());
assertEquals("basic-host", event.getHost());
assertEquals("test", event.getDescription());
assertEquals(3, event.getTagsCount());
assertTrue(event.getTagsList().contains("tag1"));
assertTrue(event.getTagsList().contains("tag2"));
assertTrue(event.getTagsList().contains("tag3"));
assertEquals(0, event.getAttributesCount());
}
@Test
public void testBatchedEvents() {
// (2 batches) + (1 remaining event)
int iterations = Integer.parseInt(PutRiemann.BATCH_SIZE.getDefaultValue()) * 2 + 1;
TestRunner runner = getTestRunner();
for (int i = 0; i < iterations; i++) {
Map<String, String> attributes = new HashMap<>();
attributes.put("riemann.metric", Float.toString(i));
attributes.put("riemann.host", "batch-host");
attributes.put("custom.attribute.1", "attr1");
attributes.put("custom.attribute.2", "attr2");
attributes.put("custom.attribute.3", "attr3");
MockFlowFile flowFile = new MockFlowFile(i);
flowFile.putAttributes(attributes);
runner.enqueue(flowFile);
}
runner.run(3);
runner.assertAllFlowFilesTransferred(PutRiemann.REL_SUCCESS);
for (int i = 0; i < iterations; i++) {
Proto.Event event = eventStream.remove();
assertEquals("nifi-test-service", event.getService());
assertTrue(5.0 == event.getTtl());
assertTrue(i == event.getMetricF());
assertEquals("batch-host", event.getHost());
assertEquals("test", event.getDescription());
assertEquals(3, event.getTagsCount());
assertEquals(3, event.getAttributesCount());
assertTrue(event.getTagsList().contains("tag1"));
assertTrue(event.getTagsList().contains("tag2"));
assertTrue(event.getTagsList().contains("tag3"));
}
}
@Test
public void testInvalidEvents() {
TestRunner runner = getTestRunner();
MockFlowFile flowFile = new MockFlowFile(1);
Map<String, String> attributes = new HashMap<>();
attributes.put("riemann.metric", "NOT A NUMBER");
flowFile.putAttributes(attributes);
runner.enqueue(flowFile);
runner.run();
runner.assertAllFlowFilesTransferred(PutRiemann.REL_FAILURE);
}
@Test(expected = AssertionError.class)
public void testFailedDeref() {
TestRunner runner = getTestRunner(true);
MockFlowFile flowFile = new MockFlowFile(1);
Map<String, String> attributes = new HashMap<>();
attributes.put("riemann.metric", "5");
flowFile.putAttributes(attributes);
runner.enqueue(flowFile);
try {
runner.run();
} catch (ProcessException e) {
runner.assertQueueNotEmpty();
throw e;
}
}
}

View File

@ -0,0 +1,53 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>nifi-nar-bundles</artifactId>
<groupId>org.apache.nifi</groupId>
<version>0.4.2-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>nifi-riemann-bundle</artifactId>
<packaging>pom</packaging>
<modules>
<module>nifi-riemann-processors</module>
<module>nifi-riemann-nar</module>
</modules>
<repositories>
<repository>
<id>clojars.org</id>
<url>http://clojars.org/repo</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>com.aphyr</groupId>
<artifactId>riemann-java-client</artifactId>
<version>0.4.0</version>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-riemann-processors</artifactId>
</dependency>
</dependencies>
</dependencyManagement>
</project>

View File

@ -42,13 +42,14 @@
<module>nifi-language-translation-bundle</module> <module>nifi-language-translation-bundle</module>
<module>nifi-mongodb-bundle</module> <module>nifi-mongodb-bundle</module>
<module>nifi-flume-bundle</module> <module>nifi-flume-bundle</module>
<module>nifi-hbase-bundle</module> <module>nifi-hbase-bundle</module>
<module>nifi-ambari-bundle</module> <module>nifi-ambari-bundle</module>
<module>nifi-image-bundle</module> <module>nifi-image-bundle</module>
<module>nifi-avro-bundle</module> <module>nifi-avro-bundle</module>
<module>nifi-couchbase-bundle</module> <module>nifi-couchbase-bundle</module>
<module>nifi-azure-bundle</module> <module>nifi-azure-bundle</module>
<module>nifi-ldap-iaa-providers-bundle</module> <module>nifi-ldap-iaa-providers-bundle</module>
<module>nifi-riemann-bundle</module>
</modules> </modules>
<dependencyManagement> <dependencyManagement>
<dependencies> <dependencies>

View File

@ -980,6 +980,12 @@ language governing permissions and limitations under the License. -->
<version>0.4.2-SNAPSHOT</version> <version>0.4.2-SNAPSHOT</version>
<type>nar</type> <type>nar</type>
</dependency> </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-riemann-nar</artifactId>
<version>0.4.2-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency> <dependency>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-hbase_1_1_2-client-service-nar</artifactId> <artifactId>nifi-hbase_1_1_2-client-service-nar</artifactId>