Adding influxdb emitter as a contrib extension (#7717)

* Adding influxdb emitter as a contrib extension

* addressing code review comments
This commit is contained in:
awelsh93 2019-05-23 19:11:48 +01:00 committed by Himanshu
parent 26fad7e06a
commit 6964ac23a2
11 changed files with 1060 additions and 0 deletions

View File

@ -324,6 +324,8 @@
<argument>-c</argument>
<argument>org.apache.druid.extensions.contrib:druid-influx-extensions</argument>
<argument>-c</argument>
<argument>org.apache.druid.extensions.contrib:druid-influxdb-emitter</argument>
<argument>-c</argument>
<argument>org.apache.druid.extensions.contrib:druid-kafka-eight-simple-consumer</argument>
<argument>-c</argument>
<argument>org.apache.druid.extensions.contrib:kafka-emitter</argument>

View File

@ -0,0 +1,75 @@
---
layout: doc_page
title: "InfluxDB Emitter"
---
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
-->
# InfluxDB Emitter
To use this Apache Druid (incubating) extension, make sure to [include](../../operations/including-extensions.html) `druid-influxdb-emitter` extension.
## Introduction
This extension emits druid metrics to [InfluxDB](https://www.influxdata.com/time-series-platform/influxdb/) over HTTP. Currently this emitter only emits service metric events to InfluxDB (See [Druid metrics](../../operations/metrics.html) for a list of metrics).
When a metric event is fired it is added to a queue of events. After a configurable amount of time, the events on the queue are transformed to InfluxDB's line protocol
and POSTed to the InfluxDB HTTP API. The entire queue is flushed at this point. The queue is also flushed as the emitter is shutdown.
Note that authentication and authorization must be [enabled](https://docs.influxdata.com/influxdb/v1.7/administration/authentication_and_authorization/) on the InfluxDB server.
## Configuration
All the configuration parameters for the influxdb emitter are under `druid.emitter.influxdb`.
|Property|Description|Required?|Default|
|--------|-----------|---------|-------|
|`druid.emitter.influxdb.hostname`|The hostname of the InfluxDB server.|Yes|N/A|
|`druid.emitter.influxdb.port`|The port of the InfluxDB server.|No|8086|
|`druid.emitter.influxdb.databaseName`|The name of the database in InfluxDB.|Yes|N/A|
|`druid.emitter.influxdb.maxQueueSize`|The size of the queue that holds events.|No|Integer.Max_Value(=2^31-1)|
|`druid.emitter.influxdb.flushPeriod`|How often (in milliseconds) the events queue is parsed into Line Protocol and POSTed to InfluxDB.|No|60000|
|`druid.emitter.influxdb.flushDelay`|How long (in milliseconds) the scheduled method will wait until it first runs.|No|60000|
|`druid.emitter.influxdb.influxdbUserName`|The username for authenticating with the InfluxDB database.|Yes|N/A|
|`druid.emitter.influxdb.influxdbPassword`|The password of the database authorized user|Yes|N/A|
|`druid.emitter.influxdb.dimensionWhitelist`|A whitelist of metric dimensions to include as tags|No|`["dataSource","type","numMetrics","numDimensions","threshold","dimension","taskType","taskStatus","tier"]`|
## InfluxDB Line Protocol
An example of how this emitter parses a Druid metric event into InfluxDB's [line protocol](https://docs.influxdata.com/influxdb/v1.7/write_protocols/line_protocol_reference/) is given here:
The syntax of the line protocol is :
`<measurement>[,<tag_key>=<tag_value>[,<tag_key>=<tag_value>]] <field_key>=<field_value>[,<field_key>=<field_value>] [<timestamp>]`
where timestamp is in nano-seconds since epoch.
A typical service metric event as recorded by Druid's logging emitter is: `Event [{"feed":"metrics","timestamp":"2017-10-31T09:09:06.857Z","service":"druid/historical","host":"historical001:8083","version":"0.11.0-SNAPSHOT","metric":"query/cache/total/hits","value":34787256}]`.
This event is parsed into line protocol according to these rules:
* The measurement becomes druid_query since query is the first part of the metric.
* The tags are service=druid/historical, hostname=historical001, metric=druid_cache_total. (The metric tag is the middle part of the druid metric separated with _ and preceded by druid_. Another example would be if an event has metric=query/time then there is no middle part and hence no metric tag)
* The field is druid_hits since this is the last part of the metric.
This gives the following String which can be POSTed to InfluxDB: `"druid_query,service=druid/historical,hostname=historical001,metric=druid_cache_total druid_hits=34787256 1509440946857000000"`
The InfluxDB emitter has a white list of dimensions
which will be added as a tag to the line protocol string if the metric has a dimension from the white list.
The value of the dimension is sanitized such that every occurence of a dot or whitespace is replaced with a `_` .

View File

@ -96,6 +96,7 @@ All of these community extensions can be downloaded using *pull-deps* with the c
|druid-thrift-extensions|Support thrift ingestion |[link](../development/extensions-contrib/thrift.html)|
|druid-opentsdb-emitter|OpenTSDB metrics emitter |[link](../development/extensions-contrib/opentsdb-emitter.html)|
|druid-moving-average-query|Support for [Moving Average](https://en.wikipedia.org/wiki/Moving_average) and other Aggregate [Window Functions](https://en.wikibooks.org/wiki/Structured_Query_Language/Window_functions) in Druid queries.|[link](../development/extensions-contrib/moving-average-query.html)|
|druid-influxdb-emitter|InfluxDB metrics emitter|[link](../development/extensions-contrib/influxdb-emitter.html)|
## Promoting Community Extension to Core Extension

View File

@ -0,0 +1,74 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<groupId>org.apache.druid.extensions.contrib</groupId>
<artifactId>druid-influxdb-emitter</artifactId>
<name>druid-influxdb-emitter</name>
<description>influxdb-emitter</description>
<parent>
<groupId>org.apache.druid</groupId>
<artifactId>druid</artifactId>
<version>0.15.0-incubating-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<dependencies>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-core</artifactId>
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>pl.pragmatists</groupId>
<artifactId>JUnitParams</artifactId>
<version>1.0.4</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.3.6</version>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-processing</artifactId>
<version>${project.parent.version}</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,214 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.emitter.influxdb;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.core.Emitter;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.HttpClientBuilder;
import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
public class InfluxdbEmitter implements Emitter
{
private static final Logger log = new Logger(InfluxdbEmitter.class);
private final HttpClient influxdbClient;
private final InfluxdbEmitterConfig influxdbEmitterConfig;
private final AtomicBoolean started = new AtomicBoolean(false);
private final ScheduledExecutorService exec = ScheduledExecutors.fixed(1, "InfluxdbEmitter-%s");
private final ImmutableSet dimensionWhiteList;
private final LinkedBlockingQueue<ServiceMetricEvent> eventsQueue;
private static final Pattern DOT_OR_WHITESPACE = Pattern.compile("[\\s]+|[.]+");
public InfluxdbEmitter(InfluxdbEmitterConfig influxdbEmitterConfig)
{
this.influxdbEmitterConfig = influxdbEmitterConfig;
this.influxdbClient = HttpClientBuilder.create().build();
this.eventsQueue = new LinkedBlockingQueue<>(influxdbEmitterConfig.getMaxQueueSize());
this.dimensionWhiteList = influxdbEmitterConfig.getDimensionWhitelist();
log.info("constructed influxdb emitter");
}
@Override
public void start()
{
synchronized (started) {
if (!started.get()) {
exec.scheduleAtFixedRate(
() -> transformAndSendToInfluxdb(eventsQueue),
influxdbEmitterConfig.getFlushDelay(),
influxdbEmitterConfig.getFlushPeriod(),
TimeUnit.MILLISECONDS
);
started.set(true);
}
}
}
@Override
public void emit(Event event)
{
if (event instanceof ServiceMetricEvent) {
ServiceMetricEvent metricEvent = (ServiceMetricEvent) event;
try {
eventsQueue.put(metricEvent);
}
catch (InterruptedException exception) {
log.error(exception, "Failed to add metricEvent to events queue.");
Thread.currentThread().interrupt();
}
}
}
public void postToInflux(String payload)
{
HttpPost post = new HttpPost(
"http://" + influxdbEmitterConfig.getHostname()
+ ":" + influxdbEmitterConfig.getPort()
+ "/write?db=" + influxdbEmitterConfig.getDatabaseName()
+ "&u=" + influxdbEmitterConfig.getInfluxdbUserName()
+ "&p=" + influxdbEmitterConfig.getInfluxdbPassword()
);
post.setEntity(new StringEntity(payload, ContentType.DEFAULT_TEXT));
post.setHeader("Content-Type", "application/x-www-form-urlencoded");
try {
influxdbClient.execute(post);
}
catch (IOException ex) {
log.info(ex, "Failed to post events to InfluxDB.");
}
finally {
post.releaseConnection();
}
}
public String transformForInfluxSystems(ServiceMetricEvent event)
{
// split Druid metric on slashes and join middle parts (if any) with "_"
String[] parts = getValue("metric", event).split("/");
String metric = String.join(
"_",
Arrays.asList(
Arrays.copyOfRange(
parts,
1,
parts.length - 1
)
)
);
// measurement
StringBuilder payload = new StringBuilder("druid_");
payload.append(parts[0]);
// tags
StringBuilder tag = new StringBuilder(",service=");
tag.append(getValue("service", event));
String metricTag = parts.length == 2 ? "" : ",metric=druid_" + metric;
tag.append(metricTag);
tag.append(StringUtils.format(",hostname=%s", getValue("host", event).split(":")[0]));
ImmutableSet<String> dimNames = ImmutableSet.copyOf(event.getUserDims().keySet());
for (String dimName : dimNames) {
if (this.dimensionWhiteList.contains(dimName)) {
tag.append(StringUtils.format(",%1$s=%2$s", dimName, sanitize(String.valueOf(event.getUserDims().get(dimName)))));
}
}
payload.append(tag);
// fields
payload.append(StringUtils.format(" druid_%1$s=%2$s", parts[parts.length - 1], getValue("value", event)));
// timestamp
payload.append(StringUtils.format(" %d\n", event.getCreatedTime().getMillis() * 1000000));
return payload.toString();
}
private static String sanitize(String namespace)
{
return DOT_OR_WHITESPACE.matcher(namespace).replaceAll("_");
}
public String getValue(String key, ServiceMetricEvent event)
{
switch (key) {
case "service":
return event.getService();
case "eventType":
return event.getClass().getSimpleName();
case "metric":
return event.getMetric();
case "feed":
return event.getFeed();
case "host":
return event.getHost();
case "value":
return event.getValue().toString();
default:
return key;
}
}
@Override
public void flush() throws IOException
{
if (started.get()) {
transformAndSendToInfluxdb(eventsQueue);
}
}
@Override
public void close() throws IOException
{
flush();
log.info("Closing [%s]", this.getClass().getName());
started.set(false);
exec.shutdownNow();
}
public void transformAndSendToInfluxdb(LinkedBlockingQueue<ServiceMetricEvent> eventsQueue)
{
StringBuilder payload = new StringBuilder();
int initialQueueSize = eventsQueue.size();
for (int i = 0; i < initialQueueSize; i++) {
payload.append(transformForInfluxSystems(eventsQueue.poll()));
}
postToInflux(payload.toString());
}
}

View File

@ -0,0 +1,196 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.emitter.influxdb;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.java.util.common.logger.Logger;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
public class InfluxdbEmitterConfig
{
private static final int DEFAULT_PORT = 8086;
private static final int DEFAULT_QUEUE_SIZE = Integer.MAX_VALUE;
private static final int DEFAULT_FLUSH_PERIOD = 60000; // milliseconds
private static final List<String> DEFAULT_DIMENSION_WHITELIST = Arrays.asList("dataSource", "type", "numMetrics", "numDimensions", "threshold", "dimension", "taskType", "taskStatus", "tier");
@JsonProperty
private final String hostname;
@JsonProperty
private final Integer port;
@JsonProperty
private final String databaseName;
@JsonProperty
private final Integer maxQueueSize;
@JsonProperty
private final Integer flushPeriod;
@JsonProperty
private final Integer flushDelay;
@JsonProperty
private final String influxdbUserName;
@JsonProperty
private final String influxdbPassword;
@JsonProperty
private final ImmutableSet<String> dimensionWhitelist;
private static Logger log = new Logger(InfluxdbEmitterConfig.class);
@JsonCreator
public InfluxdbEmitterConfig(
@JsonProperty("hostname") String hostname,
@JsonProperty("port") Integer port,
@JsonProperty("databaseName") String databaseName,
@JsonProperty("maxQueueSize") Integer maxQueueSize,
@JsonProperty("flushPeriod") Integer flushPeriod,
@JsonProperty("flushDelay") Integer flushDelay,
@JsonProperty("influxdbUserName") String influxdbUserName,
@JsonProperty("influxdbPassword") String influxdbPassword,
@JsonProperty("dimensionWhitelist") Set<String> dimensionWhitelist
)
{
this.hostname = Preconditions.checkNotNull(hostname, "hostname can not be null");
this.port = port == null ? DEFAULT_PORT : port;
this.databaseName = Preconditions.checkNotNull(databaseName, "databaseName can not be null");
this.maxQueueSize = maxQueueSize == null ? DEFAULT_QUEUE_SIZE : maxQueueSize;
this.flushPeriod = flushPeriod == null ? DEFAULT_FLUSH_PERIOD : flushPeriod;
this.flushDelay = flushDelay == null ? DEFAULT_FLUSH_PERIOD : flushDelay;
this.influxdbUserName = Preconditions.checkNotNull(influxdbUserName, "influxdbUserName can not be null");
this.influxdbPassword = Preconditions.checkNotNull(influxdbPassword, "influxdbPassword can not be null");
this.dimensionWhitelist = dimensionWhitelist == null ? ImmutableSet.copyOf(DEFAULT_DIMENSION_WHITELIST) : ImmutableSet.copyOf(dimensionWhitelist);
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (!(o instanceof InfluxdbEmitterConfig)) {
return false;
}
InfluxdbEmitterConfig that = (InfluxdbEmitterConfig) o;
if (getPort() != that.getPort()) {
return false;
}
if (!getHostname().equals(that.getHostname())) {
return false;
}
if (!getDatabaseName().equals(that.getDatabaseName())) {
return false;
}
if (getFlushPeriod() != that.getFlushPeriod()) {
return false;
}
if (getMaxQueueSize() != that.getMaxQueueSize()) {
return false;
}
if (getFlushDelay() != that.getFlushDelay()) {
return false;
}
if (!getInfluxdbUserName().equals(that.getInfluxdbUserName())) {
return false;
}
if (!getInfluxdbPassword().equals(that.getInfluxdbPassword())) {
return false;
}
if (!getDimensionWhitelist().equals(that.getDimensionWhitelist())) {
return false;
}
return true;
}
@Override
public int hashCode()
{
int result = getHostname().hashCode();
result = 31 * result + getPort();
result = 31 * result + getDatabaseName().hashCode();
result = 31 * result + getFlushPeriod();
result = 31 * result + getMaxQueueSize();
result = 31 * result + getFlushDelay();
result = 31 * result + getInfluxdbUserName().hashCode();
result = 31 * result + getInfluxdbPassword().hashCode();
result = 31 * result + getDimensionWhitelist().hashCode();
return result;
}
@JsonProperty
public String getHostname()
{
return hostname;
}
@JsonProperty
public int getPort()
{
return port;
}
@JsonProperty
public String getDatabaseName()
{
return databaseName;
}
@JsonProperty
public int getFlushPeriod()
{
return flushPeriod;
}
@JsonProperty
public int getMaxQueueSize()
{
return maxQueueSize;
}
@JsonProperty
public int getFlushDelay()
{
return flushDelay;
}
@JsonProperty
public String getInfluxdbUserName()
{
return influxdbUserName;
}
@JsonProperty
public String getInfluxdbPassword()
{
return influxdbPassword;
}
@JsonProperty
public ImmutableSet<String> getDimensionWhitelist()
{
return dimensionWhitelist;
}
}

View File

@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.emitter.influxdb;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Binder;
import com.google.inject.Provides;
import com.google.inject.name.Named;
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.initialization.DruidModule;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.core.Emitter;
import java.util.Collections;
import java.util.List;
public class InfluxdbEmitterModule implements DruidModule
{
private static final String EMITTER_TYPE = "influxdb";
private static final Logger log = new Logger(InfluxdbEmitterModule.class);
@Override
public List<? extends Module> getJacksonModules()
{
return Collections.EMPTY_LIST;
}
@Override
public void configure(Binder binder)
{
JsonConfigProvider.bind(binder, "druid.emitter." + EMITTER_TYPE, InfluxdbEmitterConfig.class);
}
@Provides
@ManageLifecycle
@Named(EMITTER_TYPE)
public Emitter getEmitter(InfluxdbEmitterConfig influxdbEmitterConfig, ObjectMapper mapper)
{
return new InfluxdbEmitter(influxdbEmitterConfig);
}
}

View File

@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.druid.emitter.influxdb.InfluxdbEmitterModule

View File

@ -0,0 +1,212 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.emitter.influxdb;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.Arrays;
public class InfluxdbEmitterConfigTest
{
private ObjectMapper mapper = new DefaultObjectMapper();
private InfluxdbEmitterConfig influxdbEmitterConfig;
@Before
public void setUp()
{
mapper.setInjectableValues(new InjectableValues.Std().addValue(
ObjectMapper.class,
new DefaultObjectMapper()
));
influxdbEmitterConfig = new InfluxdbEmitterConfig(
"localhost",
8086,
"dbname",
10000,
15000,
30000,
"adam",
"password",
null
);
}
@Test
public void testInfluxdbEmitterConfigObjectsAreDifferent() throws IOException
{
InfluxdbEmitterConfig influxdbEmitterConfigComparison = new InfluxdbEmitterConfig(
"localhost",
8080,
"dbname",
10000,
15000,
30000,
"adam",
"password",
null
);
Assert.assertNotEquals(influxdbEmitterConfig, influxdbEmitterConfigComparison);
}
@Test(expected = NullPointerException.class)
public void testConfigWithNullHostname() throws IOException
{
InfluxdbEmitterConfig influxdbEmitterConfigWithNullHostname = new InfluxdbEmitterConfig(
null,
8080,
"dbname",
10000,
15000,
30000,
"adam",
"password",
null
);
}
@Test
public void testConfigWithNullPort() throws IOException
{
InfluxdbEmitterConfig influxdbEmitterConfigWithNullPort = new InfluxdbEmitterConfig(
"localhost",
null,
"dbname",
10000,
15000,
30000,
"adam",
"password",
null
);
int expectedPort = 8086;
Assert.assertEquals(expectedPort, influxdbEmitterConfig.getPort());
}
@Test
public void testEqualsMethod()
{
InfluxdbEmitterConfig influxdbEmitterConfigComparison = new InfluxdbEmitterConfig(
"localhost",
8086,
"dbname",
10000,
15000,
30000,
"adam",
"password",
null
);
Assert.assertTrue(influxdbEmitterConfig.equals(influxdbEmitterConfigComparison));
}
@Test
public void testEqualsMethodWithNotEqualConfigs()
{
InfluxdbEmitterConfig influxdbEmitterConfigComparison = new InfluxdbEmitterConfig(
"localhost",
8086,
"dbname",
10000,
15000,
10000,
"adam",
"password",
null
);
Assert.assertFalse(influxdbEmitterConfig.equals(influxdbEmitterConfigComparison));
}
@Test(expected = NullPointerException.class)
public void testConfigWithNullInfluxdbUserName() throws IOException
{
InfluxdbEmitterConfig influxdbEmitterConfigWithNullHostname = new InfluxdbEmitterConfig(
"localhost",
8086,
"dbname",
10000,
15000,
30000,
null,
"password",
null
);
}
@Test(expected = NullPointerException.class)
public void testConfigWithNullInfluxdbPassword() throws IOException
{
InfluxdbEmitterConfig influxdbEmitterConfigWithNullHostname = new InfluxdbEmitterConfig(
"localhost",
8086,
"dbname",
10000,
15000,
30000,
"adam",
null,
null
);
}
@Test
public void testConfigWithNullDimensionWhitelist()
{
InfluxdbEmitterConfig influxdbEmitterConfig = new InfluxdbEmitterConfig(
"localhost",
8086,
"dbname",
10000,
15000,
30000,
"adam",
"password",
null
);
ImmutableSet<String> expected = ImmutableSet.copyOf(Arrays.asList("dataSource", "type", "numMetrics", "numDimensions", "threshold", "dimension", "taskType", "taskStatus", "tier"));
Assert.assertEquals(expected, influxdbEmitterConfig.getDimensionWhitelist());
}
@Test
public void testConfigWithDimensionWhitelist()
{
InfluxdbEmitterConfig influxdbEmitterConfig = new InfluxdbEmitterConfig(
"localhost",
8086,
"dbname",
10000,
15000,
30000,
"adam",
"password",
ImmutableSet.of("dataSource", "taskType")
);
ImmutableSet<String> expected = ImmutableSet.copyOf(Arrays.asList("dataSource", "taskType"));
Assert.assertEquals(expected, influxdbEmitterConfig.getDimensionWhitelist());
}
}

View File

@ -0,0 +1,208 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.emitter.influxdb;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.java.util.emitter.service.ServiceEventBuilder;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class InfluxdbEmitterTest
{
private ServiceMetricEvent event;
@Before
public void setUp()
{
DateTime date = new DateTime(2017,
10,
30,
10,
00,
DateTimeZone.UTC); // 10:00am on 30/10/2017 = 1509357600000000000 in epoch nanoseconds
String metric = "metric/te/st/value";
Number value = 1234;
ImmutableMap<String, String> serviceDims = ImmutableMap.of(
"service",
"druid/historical",
"host",
"localhost",
"version",
"0.10.0"
);
ServiceMetricEvent.Builder builder = ServiceMetricEvent.builder();
builder.setDimension("nonWhiteListedDim", "test");
builder.setDimension("dataSource", "test_datasource");
ServiceEventBuilder eventBuilder = builder.build(date, metric, value);
event = (ServiceMetricEvent) eventBuilder.build(serviceDims);
}
@Test
public void testTransformForInfluxWithLongMetric()
{
InfluxdbEmitterConfig config = new InfluxdbEmitterConfig(
"localhost",
8086,
"dbname",
10000,
15000,
30000,
"adam",
"password",
null
);
InfluxdbEmitter influxdbEmitter = new InfluxdbEmitter(config);
String expected =
"druid_metric,service=druid/historical,metric=druid_te_st,hostname=localhost,dataSource=test_datasource druid_value=1234 1509357600000000000"
+ "\n";
String actual = influxdbEmitter.transformForInfluxSystems(event);
Assert.assertEquals(expected, actual);
}
@Test
public void testTransformForInfluxWithShortMetric()
{
DateTime date = new DateTime(2017,
10,
30,
10,
00,
DateTimeZone.UTC); // 10:00am on 30/10/2017 = 1509357600000000000 in epoch nanoseconds
String metric = "metric/time";
Number value = 1234;
ImmutableMap<String, String> serviceDims = ImmutableMap.of(
"service",
"druid/historical",
"host",
"localhost",
"version",
"0.10.0"
);
ServiceMetricEvent.Builder builder = ServiceMetricEvent.builder();
ServiceEventBuilder eventBuilder = builder.build(date, metric, value);
ServiceMetricEvent event = (ServiceMetricEvent) eventBuilder.build(serviceDims);
InfluxdbEmitterConfig config = new InfluxdbEmitterConfig(
"localhost",
8086,
"dbname",
10000,
15000,
30000,
"adam",
"password",
null
);
InfluxdbEmitter influxdbEmitter = new InfluxdbEmitter(config);
String expected = "druid_metric,service=druid/historical,hostname=localhost druid_time=1234 1509357600000000000"
+ "\n";
String actual = influxdbEmitter.transformForInfluxSystems(event);
Assert.assertEquals(expected, actual);
}
@Test
public void testMetricIsInDimensionWhitelist()
{
DateTime date = new DateTime(2017,
10,
30,
10,
00,
DateTimeZone.UTC); // 10:00am on 30/10/2017 = 1509357600000000000 in epoch nanoseconds
String metric = "metric/time";
Number value = 1234;
ImmutableMap<String, String> serviceDims = ImmutableMap.of(
"service",
"druid/historical",
"host",
"localhost",
"version",
"0.10.0"
);
ServiceMetricEvent.Builder builder = ServiceMetricEvent.builder();
ServiceEventBuilder eventBuilder = builder.build(date, metric, value);
builder.setDimension("dataSource", "wikipedia");
builder.setDimension("taskType", "index");
ServiceMetricEvent event = (ServiceMetricEvent) eventBuilder.build(serviceDims);
InfluxdbEmitterConfig config = new InfluxdbEmitterConfig(
"localhost",
8086,
"dbname",
10000,
15000,
30000,
"adam",
"password",
ImmutableSet.of("dataSource")
);
InfluxdbEmitter influxdbEmitter = new InfluxdbEmitter(config);
String expected = "druid_metric,service=druid/historical,hostname=localhost,dataSource=wikipedia druid_time=1234 1509357600000000000"
+ "\n";
String actual = influxdbEmitter.transformForInfluxSystems(event);
Assert.assertEquals(expected, actual);
}
@Test
public void testMetricIsInDefaultDimensionWhitelist()
{
DateTime date = new DateTime(2017,
10,
30,
10,
00,
DateTimeZone.UTC); // 10:00am on 30/10/2017 = 1509357600000000000 in epoch nanoseconds
String metric = "metric/time";
Number value = 1234;
ImmutableMap<String, String> serviceDims = ImmutableMap.of(
"service",
"druid/historical",
"host",
"localhost",
"version",
"0.10.0"
);
ServiceMetricEvent.Builder builder = ServiceMetricEvent.builder();
ServiceEventBuilder eventBuilder = builder.build(date, metric, value);
builder.setDimension("dataSource", "wikipedia");
builder.setDimension("taskType", "index");
ServiceMetricEvent event = (ServiceMetricEvent) eventBuilder.build(serviceDims);
InfluxdbEmitterConfig config = new InfluxdbEmitterConfig(
"localhost",
8086,
"dbname",
10000,
15000,
30000,
"adam",
"password",
null
);
InfluxdbEmitter influxdbEmitter = new InfluxdbEmitter(config);
String expected = "druid_metric,service=druid/historical,hostname=localhost,dataSource=wikipedia,taskType=index druid_time=1234 1509357600000000000"
+ "\n";
String actual = influxdbEmitter.transformForInfluxSystems(event);
Assert.assertEquals(expected, actual);
}
}

View File

@ -175,6 +175,7 @@
<module>extensions-contrib/momentsketch</module>
<module>extensions-contrib/moving-average-query</module>
<module>extensions-contrib/tdigestsketch</module>
<module>extensions-contrib/influxdb-emitter</module>
<!-- distribution packaging -->
<module>distribution</module>
</modules>