mirror of https://github.com/apache/druid.git
Opentsdb emitter extension (#5380)
* opentsdb emitter extension * doc for opentsdb emitter extension * update opentsdb emitter doc * add the ms unit to the constant name * add a configurable event limit * fix version to 0.13.0-SNAPSHOT * using a thread to consume metric event * rename method and parameter
This commit is contained in:
parent
b234a119ac
commit
aa7aee53ce
|
@ -224,6 +224,8 @@
|
||||||
<argument>-c</argument>
|
<argument>-c</argument>
|
||||||
<argument>io.druid.extensions.contrib:graphite-emitter</argument>
|
<argument>io.druid.extensions.contrib:graphite-emitter</argument>
|
||||||
<argument>-c</argument>
|
<argument>-c</argument>
|
||||||
|
<argument>io.druid.extensions.contrib:druid-opentsdb-emitter</argument>
|
||||||
|
<argument>-c</argument>
|
||||||
<argument>io.druid.extensions.contrib:druid-orc-extensions</argument>
|
<argument>io.druid.extensions.contrib:druid-orc-extensions</argument>
|
||||||
<argument>-c</argument>
|
<argument>-c</argument>
|
||||||
<argument>io.druid.extensions.contrib:druid-parquet-extensions</argument>
|
<argument>io.druid.extensions.contrib:druid-parquet-extensions</argument>
|
||||||
|
|
|
@ -0,0 +1,42 @@
|
||||||
|
---
|
||||||
|
layout: doc_page
|
||||||
|
---
|
||||||
|
|
||||||
|
# Opentsdb Emitter
|
||||||
|
|
||||||
|
To use this extension, make sure to [include](../../operations/including-extensions.html) `opentsdb-emitter` extension.
|
||||||
|
|
||||||
|
## Introduction
|
||||||
|
|
||||||
|
This extension emits druid metrics to [OpenTSDB](https://github.com/OpenTSDB/opentsdb) over HTTP. And this emitter only emits service metric events to OpenTSDB (See http://druid.io/docs/latest/operations/metrics.html for a list of metrics).
|
||||||
|
|
||||||
|
## Configuration
|
||||||
|
|
||||||
|
All the configuration parameters for the opentsdb emitter are under `druid.emitter.opentsdb`.
|
||||||
|
|
||||||
|
|property|description|required?|default|
|
||||||
|
|--------|-----------|---------|-------|
|
||||||
|
|`druid.emitter.opentsdb.host`|The host of the OpenTSDB server.|yes|none|
|
||||||
|
|`druid.emitter.opentsdb.port`|The port of the OpenTSDB server.|yes|none|
|
||||||
|
|`druid.emitter.opentsdb.connectionTimeout`|Connection timeout(in milliseconds).|no|2000|
|
||||||
|
|`druid.emitter.opentsdb.readTimeout`|Read timeout(in milliseconds).|no|2000|
|
||||||
|
|`druid.emitter.opentsdb.flushThreshold`|Queue flushing threshold.(Events will be sent as one batch)|no|100|
|
||||||
|
|`druid.emitter.opentsdb.maxQueueSize`|Maximum size of the queue used to buffer events.|no|1000|
|
||||||
|
|`druid.emitter.opentsdb.metricMapPath`|JSON file defining the desired metrics and dimensions for every Druid metric|no|./src/main/resources/defaultMetrics.json|
|
||||||
|
|
||||||
|
### Druid to OpenTSDB Event Converter
|
||||||
|
|
||||||
|
The opentsdb emitter will send only the desired metrics and dimensions which is defined in a JSON file.
|
||||||
|
If the user does not specify their own JSON file, a default file is used. All metrics are expected to be configured in the JSON file. Metrics which are not configured will be logged.
|
||||||
|
Desired metrics and dimensions is organized using the following schema:`<druid metric name> : [ <dimension list> ]`<br />
|
||||||
|
e.g.
|
||||||
|
|
||||||
|
```json
|
||||||
|
"query/time": [
|
||||||
|
"dataSource",
|
||||||
|
"type"
|
||||||
|
]
|
||||||
|
```
|
||||||
|
|
||||||
|
For most use-cases, the default configuration is sufficient.
|
||||||
|
|
|
@ -71,6 +71,7 @@ All of these community extensions can be downloaded using *pull-deps* with the c
|
||||||
|statsd-emitter|StatsD metrics emitter|[link](../development/extensions-contrib/statsd.html)|
|
|statsd-emitter|StatsD metrics emitter|[link](../development/extensions-contrib/statsd.html)|
|
||||||
|kafka-emitter|Kafka metrics emitter|[link](../development/extensions-contrib/kafka-emitter.html)|
|
|kafka-emitter|Kafka metrics emitter|[link](../development/extensions-contrib/kafka-emitter.html)|
|
||||||
|druid-thrift-extensions|Support thrift ingestion |[link](../development/extensions-contrib/thrift.html)|
|
|druid-thrift-extensions|Support thrift ingestion |[link](../development/extensions-contrib/thrift.html)|
|
||||||
|
|druid-opentsdb-emitter|OpenTSDB metrics emitter |[link](../development/extensions-contrib/opentsdb-emitter.html)|
|
||||||
|
|
||||||
## Promoting Community Extension to Core Extension
|
## Promoting Community Extension to Core Extension
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,64 @@
|
||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
|
||||||
|
<!--
|
||||||
|
~ Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||||
|
~ or more contributor license agreements. See the NOTICE file
|
||||||
|
~ distributed with this work for additional information
|
||||||
|
~ regarding copyright ownership. Metamarkets licenses this file
|
||||||
|
~ to you under the Apache License, Version 2.0 (the
|
||||||
|
~ "License"); you may not use this file except in compliance
|
||||||
|
~ with the License. You may obtain a copy of the License at
|
||||||
|
~
|
||||||
|
~ http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
~
|
||||||
|
~ Unless required by applicable law or agreed to in writing,
|
||||||
|
~ software distributed under the License is distributed on an
|
||||||
|
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
~ KIND, either express or implied. See the License for the
|
||||||
|
~ specific language governing permissions and limitations
|
||||||
|
~ under the License.
|
||||||
|
-->
|
||||||
|
|
||||||
|
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||||
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
|
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||||
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
|
||||||
|
<groupId>io.druid.extensions.contrib</groupId>
|
||||||
|
<artifactId>druid-opentsdb-emitter</artifactId>
|
||||||
|
<name>druid-opentsdb-emitter</name>
|
||||||
|
|
||||||
|
<parent>
|
||||||
|
<groupId>io.druid</groupId>
|
||||||
|
<artifactId>druid</artifactId>
|
||||||
|
<version>0.13.0-SNAPSHOT</version>
|
||||||
|
<relativePath>../../pom.xml</relativePath>
|
||||||
|
</parent>
|
||||||
|
|
||||||
|
<dependencies>
|
||||||
|
<dependency>
|
||||||
|
<groupId>io.druid</groupId>
|
||||||
|
<artifactId>druid-api</artifactId>
|
||||||
|
<version>${project.parent.version}</version>
|
||||||
|
<scope>provided</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>io.druid</groupId>
|
||||||
|
<artifactId>druid-server</artifactId>
|
||||||
|
<version>${project.parent.version}</version>
|
||||||
|
<scope>provided</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.sun.jersey</groupId>
|
||||||
|
<artifactId>jersey-client</artifactId>
|
||||||
|
<version>${jersey.version}</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<!-- Tests -->
|
||||||
|
<dependency>
|
||||||
|
<groupId>junit</groupId>
|
||||||
|
<artifactId>junit</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
</dependencies>
|
||||||
|
</project>
|
|
@ -0,0 +1,109 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Metamarkets licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.druid.emitter.opentsdb;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.core.type.TypeReference;
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import com.google.common.base.Strings;
|
||||||
|
import io.druid.java.util.emitter.service.ServiceMetricEvent;
|
||||||
|
import io.druid.java.util.common.ISE;
|
||||||
|
import io.druid.java.util.common.logger.Logger;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.FileInputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
|
public class EventConverter
|
||||||
|
{
|
||||||
|
private static final Logger log = new Logger(EventConverter.class);
|
||||||
|
private static final Pattern WHITESPACE = Pattern.compile("[\\s]+");
|
||||||
|
|
||||||
|
private final Map<String, Set<String>> metricMap;
|
||||||
|
|
||||||
|
public EventConverter(ObjectMapper mapper, String metricMapPath)
|
||||||
|
{
|
||||||
|
metricMap = readMap(mapper, metricMapPath);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected String sanitize(String metric)
|
||||||
|
{
|
||||||
|
return WHITESPACE.matcher(metric.trim()).replaceAll("_").replaceAll("/", ".");
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This function will convert a druid event to a opentsdb event.
|
||||||
|
* Also this function acts as a filter. It returns <tt>null</tt> if the event is not suppose to be emitted to Opentsdb.
|
||||||
|
* And it will filter out dimensions which is not suppose to be emitted.
|
||||||
|
*
|
||||||
|
* @param serviceMetricEvent Druid event ot type {@link ServiceMetricEvent}
|
||||||
|
*
|
||||||
|
* @return {@link OpentsdbEvent} or <tt>null</tt>
|
||||||
|
*/
|
||||||
|
public OpentsdbEvent convert(ServiceMetricEvent serviceMetricEvent)
|
||||||
|
{
|
||||||
|
String metric = serviceMetricEvent.getMetric();
|
||||||
|
if (!metricMap.containsKey(metric)) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
long timestamp = serviceMetricEvent.getCreatedTime().getMillis() / 1000L;
|
||||||
|
Number value = serviceMetricEvent.getValue();
|
||||||
|
|
||||||
|
Map<String, Object> tags = new HashMap<>();
|
||||||
|
String service = serviceMetricEvent.getService();
|
||||||
|
String host = serviceMetricEvent.getHost();
|
||||||
|
tags.put("service", service);
|
||||||
|
tags.put("host", host);
|
||||||
|
|
||||||
|
Map<String, Object> userDims = serviceMetricEvent.getUserDims();
|
||||||
|
for (String dim : metricMap.get(metric)) {
|
||||||
|
if (userDims.containsKey(dim)) {
|
||||||
|
tags.put(dim, userDims.get(dim));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return new OpentsdbEvent(sanitize(metric), timestamp, value, tags);
|
||||||
|
}
|
||||||
|
|
||||||
|
private Map<String, Set<String>> readMap(ObjectMapper mapper, String metricMapPath)
|
||||||
|
{
|
||||||
|
try {
|
||||||
|
InputStream is;
|
||||||
|
if (Strings.isNullOrEmpty(metricMapPath)) {
|
||||||
|
log.info("Using default metric map");
|
||||||
|
is = this.getClass().getClassLoader().getResourceAsStream("defaultMetrics.json");
|
||||||
|
} else {
|
||||||
|
log.info("Using default metric map located at [%s]", metricMapPath);
|
||||||
|
is = new FileInputStream(new File(metricMapPath));
|
||||||
|
}
|
||||||
|
return mapper.reader(new TypeReference<Map<String, Set<String>>>()
|
||||||
|
{
|
||||||
|
}).readValue(is);
|
||||||
|
}
|
||||||
|
catch (IOException e) {
|
||||||
|
throw new ISE(e, "Failed to parse metrics and dimensions");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,82 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Metamarkets licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.druid.emitter.opentsdb;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import io.druid.java.util.common.logger.Logger;
|
||||||
|
import io.druid.java.util.emitter.core.Emitter;
|
||||||
|
import io.druid.java.util.emitter.core.Event;
|
||||||
|
import io.druid.java.util.emitter.service.ServiceMetricEvent;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
public class OpentsdbEmitter implements Emitter
|
||||||
|
{
|
||||||
|
private static final Logger log = new Logger(OpentsdbEmitter.class);
|
||||||
|
|
||||||
|
private final OpentsdbSender sender;
|
||||||
|
private final EventConverter converter;
|
||||||
|
|
||||||
|
public OpentsdbEmitter(OpentsdbEmitterConfig config, ObjectMapper mapper)
|
||||||
|
{
|
||||||
|
this.sender = new OpentsdbSender(
|
||||||
|
config.getHost(),
|
||||||
|
config.getPort(),
|
||||||
|
config.getConnectionTimeout(),
|
||||||
|
config.getReadTimeout(),
|
||||||
|
config.getFlushThreshold(),
|
||||||
|
config.getMaxQueueSize()
|
||||||
|
);
|
||||||
|
this.converter = new EventConverter(mapper, config.getMetricMapPath());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void start()
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void emit(Event event)
|
||||||
|
{
|
||||||
|
if (event instanceof ServiceMetricEvent) {
|
||||||
|
OpentsdbEvent opentsdbEvent = converter.convert((ServiceMetricEvent) event);
|
||||||
|
if (opentsdbEvent != null) {
|
||||||
|
sender.enqueue(opentsdbEvent);
|
||||||
|
} else {
|
||||||
|
log.debug(
|
||||||
|
"Metric=[%s] has not been configured to be emitted to opentsdb",
|
||||||
|
((ServiceMetricEvent) event).getMetric()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void flush() throws IOException
|
||||||
|
{
|
||||||
|
sender.flush();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException
|
||||||
|
{
|
||||||
|
sender.close();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,158 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Metamarkets licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.druid.emitter.opentsdb;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||||
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
|
||||||
|
public class OpentsdbEmitterConfig
|
||||||
|
{
|
||||||
|
private static final int DEFAULT_FLUSH_THRESHOLD = 100;
|
||||||
|
private static final int DEFAULT_MAX_QUEUE_SIZE = 1000;
|
||||||
|
private static final int DEFAULT_CONNECTION_TIMEOUT_MILLIS = 2000;
|
||||||
|
private static final int DEFAULT_READ_TIMEOUT_MILLIS = 2000;
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
private final String host;
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
private final int port;
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
private final int connectionTimeout;
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
private final int readTimeout;
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
private final int flushThreshold;
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
private final int maxQueueSize;
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
private final String metricMapPath;
|
||||||
|
|
||||||
|
@JsonCreator
|
||||||
|
public OpentsdbEmitterConfig(
|
||||||
|
@JsonProperty("host") String host,
|
||||||
|
@JsonProperty("port") Integer port,
|
||||||
|
@JsonProperty("connectionTimeout") Integer connectionTimeout,
|
||||||
|
@JsonProperty("readTimeout") Integer readTimeout,
|
||||||
|
@JsonProperty("flushThreshold") Integer flushThreshold,
|
||||||
|
@JsonProperty("maxQueueSize") Integer maxQueueSize,
|
||||||
|
@JsonProperty("metricMapPath") String metricMapPath
|
||||||
|
)
|
||||||
|
{
|
||||||
|
this.host = Preconditions.checkNotNull(host, "host can not be null.");
|
||||||
|
this.port = Preconditions.checkNotNull(port, "port can not be null");
|
||||||
|
this.connectionTimeout = (connectionTimeout == null || connectionTimeout < 0)
|
||||||
|
? DEFAULT_CONNECTION_TIMEOUT_MILLIS
|
||||||
|
: connectionTimeout;
|
||||||
|
this.readTimeout =
|
||||||
|
(readTimeout == null || readTimeout < 0) ? DEFAULT_READ_TIMEOUT_MILLIS : readTimeout;
|
||||||
|
this.flushThreshold = (flushThreshold == null || flushThreshold < 0) ? DEFAULT_FLUSH_THRESHOLD : flushThreshold;
|
||||||
|
this.maxQueueSize = (maxQueueSize == null || maxQueueSize < 0) ? DEFAULT_MAX_QUEUE_SIZE : maxQueueSize;
|
||||||
|
this.metricMapPath = metricMapPath;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object o)
|
||||||
|
{
|
||||||
|
if (this == o) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (o == null || getClass() != o.getClass()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
OpentsdbEmitterConfig that = (OpentsdbEmitterConfig) o;
|
||||||
|
|
||||||
|
if (!host.equals(that.host)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (port != that.port) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (connectionTimeout != that.connectionTimeout) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (readTimeout != that.readTimeout) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (flushThreshold != that.flushThreshold) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (maxQueueSize != that.maxQueueSize) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return metricMapPath != null ? metricMapPath.equals(that.metricMapPath)
|
||||||
|
: that.metricMapPath == null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode()
|
||||||
|
{
|
||||||
|
int result = host.hashCode();
|
||||||
|
result = 31 * result + port;
|
||||||
|
result = 31 * result + connectionTimeout;
|
||||||
|
result = 31 * result + readTimeout;
|
||||||
|
result = 31 * result + flushThreshold;
|
||||||
|
result = 31 * result + maxQueueSize;
|
||||||
|
result = 31 * result + (metricMapPath != null ? metricMapPath.hashCode() : 0);
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getHost()
|
||||||
|
{
|
||||||
|
return host;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getPort()
|
||||||
|
{
|
||||||
|
return port;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getConnectionTimeout()
|
||||||
|
{
|
||||||
|
return connectionTimeout;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getReadTimeout()
|
||||||
|
{
|
||||||
|
return readTimeout;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getFlushThreshold()
|
||||||
|
{
|
||||||
|
return flushThreshold;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getMaxQueueSize()
|
||||||
|
{
|
||||||
|
return maxQueueSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getMetricMapPath()
|
||||||
|
{
|
||||||
|
return metricMapPath;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,58 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Metamarkets licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.druid.emitter.opentsdb;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.Module;
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import com.google.inject.Binder;
|
||||||
|
import com.google.inject.Provides;
|
||||||
|
import com.google.inject.name.Named;
|
||||||
|
import io.druid.java.util.emitter.core.Emitter;
|
||||||
|
import io.druid.guice.JsonConfigProvider;
|
||||||
|
import io.druid.guice.ManageLifecycle;
|
||||||
|
import io.druid.initialization.DruidModule;
|
||||||
|
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
public class OpentsdbEmitterModule implements DruidModule
|
||||||
|
{
|
||||||
|
private static final String EMITTER_TYPE = "opentsdb";
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<? extends Module> getJacksonModules()
|
||||||
|
{
|
||||||
|
return Collections.EMPTY_LIST;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void configure(Binder binder)
|
||||||
|
{
|
||||||
|
JsonConfigProvider.bind(binder, "druid.emitter." + EMITTER_TYPE, OpentsdbEmitterConfig.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Provides
|
||||||
|
@ManageLifecycle
|
||||||
|
@Named(EMITTER_TYPE)
|
||||||
|
public Emitter getEmitter(OpentsdbEmitterConfig config, ObjectMapper mapper)
|
||||||
|
{
|
||||||
|
return new OpentsdbEmitter(config, mapper);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,109 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Metamarkets licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.druid.emitter.opentsdb;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
public class OpentsdbEvent
|
||||||
|
{
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
private final String metric;
|
||||||
|
|
||||||
|
// timestamp in seconds
|
||||||
|
@JsonProperty
|
||||||
|
private final long timestamp;
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
private final Object value;
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
private final Map<String, Object> tags;
|
||||||
|
|
||||||
|
public OpentsdbEvent(
|
||||||
|
@JsonProperty("metric") String metric,
|
||||||
|
@JsonProperty("timestamp") Long timestamp,
|
||||||
|
@JsonProperty("value") Object value,
|
||||||
|
@JsonProperty("tags") Map<String, Object> tags
|
||||||
|
)
|
||||||
|
{
|
||||||
|
this.metric = Preconditions.checkNotNull(metric, "metric can not be null.");
|
||||||
|
this.timestamp = Preconditions.checkNotNull(timestamp, "timestamp can not be null.");
|
||||||
|
this.value = Preconditions.checkNotNull(value, "value can not be null.");
|
||||||
|
this.tags = Preconditions.checkNotNull(tags, "tags can not be null.");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object o)
|
||||||
|
{
|
||||||
|
if (this == o) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (o == null || getClass() != o.getClass()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
OpentsdbEvent that = (OpentsdbEvent) o;
|
||||||
|
|
||||||
|
if (!metric.equals(that.metric)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (timestamp != that.timestamp) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (!value.equals(that.value)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return tags.equals(that.tags);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode()
|
||||||
|
{
|
||||||
|
int result = metric.hashCode();
|
||||||
|
result = 31 * result + (int) timestamp;
|
||||||
|
result = 31 * result + value.hashCode();
|
||||||
|
result = 31 * result + tags.hashCode();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getMetric()
|
||||||
|
{
|
||||||
|
return metric;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getTimestamp()
|
||||||
|
{
|
||||||
|
return timestamp;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Object getValue()
|
||||||
|
{
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Map<String, Object> getTags()
|
||||||
|
{
|
||||||
|
return tags;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,129 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Metamarkets licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.druid.emitter.opentsdb;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import com.sun.jersey.api.client.Client;
|
||||||
|
import com.sun.jersey.api.client.WebResource;
|
||||||
|
import io.druid.java.util.common.logger.Logger;
|
||||||
|
|
||||||
|
import javax.ws.rs.core.MediaType;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.ArrayBlockingQueue;
|
||||||
|
import java.util.concurrent.BlockingQueue;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
|
public class OpentsdbSender
|
||||||
|
{
|
||||||
|
/**
|
||||||
|
* @see <a href="http://opentsdb.net/docs/build/html/api_http/put.html">Opentsdb - /api/put</a>
|
||||||
|
*/
|
||||||
|
private static final String PATH = "/api/put";
|
||||||
|
private static final Logger log = new Logger(OpentsdbSender.class);
|
||||||
|
|
||||||
|
private final AtomicLong countLostEvents = new AtomicLong(0);
|
||||||
|
private final int flushThreshold;
|
||||||
|
private final List<OpentsdbEvent> events;
|
||||||
|
private final BlockingQueue<OpentsdbEvent> eventQueue;
|
||||||
|
private final Client client;
|
||||||
|
private final WebResource webResource;
|
||||||
|
private final ExecutorService executor = Executors.newFixedThreadPool(1);
|
||||||
|
private volatile boolean running = true;
|
||||||
|
|
||||||
|
public OpentsdbSender(String host, int port, int connectionTimeout, int readTimeout, int flushThreshold, int maxQueueSize)
|
||||||
|
{
|
||||||
|
this.flushThreshold = flushThreshold;
|
||||||
|
events = new ArrayList<>(flushThreshold);
|
||||||
|
eventQueue = new ArrayBlockingQueue<>(maxQueueSize);
|
||||||
|
|
||||||
|
client = Client.create();
|
||||||
|
client.setConnectTimeout(connectionTimeout);
|
||||||
|
client.setReadTimeout(readTimeout);
|
||||||
|
webResource = client.resource("http://" + host + ":" + port + PATH);
|
||||||
|
|
||||||
|
executor.execute(new EventConsumer());
|
||||||
|
}
|
||||||
|
|
||||||
|
public void enqueue(OpentsdbEvent event)
|
||||||
|
{
|
||||||
|
if (!eventQueue.offer(event)) {
|
||||||
|
if (countLostEvents.getAndIncrement() % 1000 == 0) {
|
||||||
|
log.error(
|
||||||
|
"Lost total of [%s] events because of emitter queue is full. Please increase the capacity.",
|
||||||
|
countLostEvents.get()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void flush()
|
||||||
|
{
|
||||||
|
sendEvents();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void close()
|
||||||
|
{
|
||||||
|
flush();
|
||||||
|
client.destroy();
|
||||||
|
running = false;
|
||||||
|
executor.shutdown();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void sendEvents()
|
||||||
|
{
|
||||||
|
if (!events.isEmpty()) {
|
||||||
|
try {
|
||||||
|
webResource.entity(events, MediaType.APPLICATION_JSON_TYPE).post();
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
log.error(e, "send to opentsdb server failed");
|
||||||
|
}
|
||||||
|
finally {
|
||||||
|
events.clear();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private class EventConsumer implements Runnable
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void run()
|
||||||
|
{
|
||||||
|
while (running) {
|
||||||
|
if (!eventQueue.isEmpty()) {
|
||||||
|
OpentsdbEvent event = eventQueue.poll();
|
||||||
|
events.add(event);
|
||||||
|
if (events.size() >= flushThreshold) {
|
||||||
|
sendEvents();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
WebResource getWebResource()
|
||||||
|
{
|
||||||
|
return webResource;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1 @@
|
||||||
|
io.druid.emitter.opentsdb.OpentsdbEmitterModule
|
|
@ -0,0 +1,205 @@
|
||||||
|
{
|
||||||
|
"query/time": [
|
||||||
|
"dataSource",
|
||||||
|
"type"
|
||||||
|
],
|
||||||
|
"query/bytes": [
|
||||||
|
"dataSource",
|
||||||
|
"type"
|
||||||
|
],
|
||||||
|
"query/node/time": [
|
||||||
|
"server"
|
||||||
|
],
|
||||||
|
"query/node/bytes": [
|
||||||
|
"server"
|
||||||
|
],
|
||||||
|
"query/node/ttfb": [
|
||||||
|
"server"
|
||||||
|
],
|
||||||
|
"query/intervalChunk/time": [
|
||||||
|
"chunkInterval"
|
||||||
|
],
|
||||||
|
"query/success/count": [],
|
||||||
|
"query/failed/count": [],
|
||||||
|
"query/interrupted/count": [],
|
||||||
|
"query/wait/time": [
|
||||||
|
"segment"
|
||||||
|
],
|
||||||
|
"query/cpu/time": [
|
||||||
|
"dataSource",
|
||||||
|
"type"
|
||||||
|
],
|
||||||
|
"jetty/numOpenConnections": [],
|
||||||
|
"query/cache/delta/numEntries": [],
|
||||||
|
"query/cache/delta/sizeBytes": [],
|
||||||
|
"query/cache/delta/hits": [],
|
||||||
|
"query/cache/delta/misses": [],
|
||||||
|
"query/cache/delta/evictions": [],
|
||||||
|
"query/cache/delta/hitRate": [],
|
||||||
|
"query/cache/delta/averageBytes": [],
|
||||||
|
"query/cache/delta/timeouts": [],
|
||||||
|
"query/cache/delta/errors": [],
|
||||||
|
"query/cache/total/numEntries": [],
|
||||||
|
"query/cache/total/sizeBytes": [],
|
||||||
|
"query/cache/total/hits": [],
|
||||||
|
"query/cache/total/misses": [],
|
||||||
|
"query/cache/total/evictions": [],
|
||||||
|
"query/cache/total/hitRate": [],
|
||||||
|
"query/cache/total/averageBytes": [],
|
||||||
|
"query/cache/total/timeouts": [],
|
||||||
|
"query/cache/total/errors": [],
|
||||||
|
"ingest/events/thrownAway": [
|
||||||
|
"dataSource"
|
||||||
|
],
|
||||||
|
"ingest/events/unparseable": [
|
||||||
|
"dataSource"
|
||||||
|
],
|
||||||
|
"ingest/events/processed": [
|
||||||
|
"dataSource"
|
||||||
|
],
|
||||||
|
"ingest/rows/output": [
|
||||||
|
"dataSource"
|
||||||
|
],
|
||||||
|
"ingest/persist/count": [
|
||||||
|
"dataSource"
|
||||||
|
],
|
||||||
|
"ingest/persist/time": [
|
||||||
|
"dataSource"
|
||||||
|
],
|
||||||
|
"ingest/persist/cpu": [
|
||||||
|
"dataSource"
|
||||||
|
],
|
||||||
|
"ingest/persist/backPressure": [
|
||||||
|
"dataSource"
|
||||||
|
],
|
||||||
|
"ingest/persist/failed": [
|
||||||
|
"dataSource"
|
||||||
|
],
|
||||||
|
"ingest/handoff/failed": [
|
||||||
|
"dataSource"
|
||||||
|
],
|
||||||
|
"ingest/merge/time": [
|
||||||
|
"dataSource"
|
||||||
|
],
|
||||||
|
"ingest/merge/cpu": [
|
||||||
|
"dataSource"
|
||||||
|
],
|
||||||
|
"ingest/handoff/count": [
|
||||||
|
"dataSource"
|
||||||
|
],
|
||||||
|
"ingest/sink/count": [
|
||||||
|
"dataSource"
|
||||||
|
],
|
||||||
|
"ingest/events/messageGap": [
|
||||||
|
"dataSource"
|
||||||
|
],
|
||||||
|
"ingest/kafka/lag": [
|
||||||
|
"dataSource"
|
||||||
|
],
|
||||||
|
"task/run/time": [
|
||||||
|
"dataSource"
|
||||||
|
],
|
||||||
|
"segment/added/bytes": [
|
||||||
|
"dataSource"
|
||||||
|
],
|
||||||
|
"segment/moved/bytes": [
|
||||||
|
"dataSource"
|
||||||
|
],
|
||||||
|
"segment/nuked/bytes": [
|
||||||
|
"dataSource"
|
||||||
|
],
|
||||||
|
"segment/assigned/count": [
|
||||||
|
"tier"
|
||||||
|
],
|
||||||
|
"segment/moved/count": [
|
||||||
|
"tier"
|
||||||
|
],
|
||||||
|
"segment/dropped/count": [
|
||||||
|
"tier"
|
||||||
|
],
|
||||||
|
"segment/deleted/count": [
|
||||||
|
"tier"
|
||||||
|
],
|
||||||
|
"segment/unneeded/count": [
|
||||||
|
"tier"
|
||||||
|
],
|
||||||
|
"segment/cost/raw": [
|
||||||
|
"tier"
|
||||||
|
],
|
||||||
|
"segment/cost/normalization": [
|
||||||
|
"tier"
|
||||||
|
],
|
||||||
|
"segment/cost/normalized": [
|
||||||
|
"tier"
|
||||||
|
],
|
||||||
|
"segment/loadQueue/size": [
|
||||||
|
"server"
|
||||||
|
],
|
||||||
|
"segment/loadQueue/failed": [
|
||||||
|
"server"
|
||||||
|
],
|
||||||
|
"segment/loadQueue/count": [
|
||||||
|
"server"
|
||||||
|
],
|
||||||
|
"segment/dropQueue/count": [
|
||||||
|
"server"
|
||||||
|
],
|
||||||
|
"segment/size": [
|
||||||
|
"dataSource"
|
||||||
|
],
|
||||||
|
"segment/count": [
|
||||||
|
"dataSource"
|
||||||
|
],
|
||||||
|
"segment/overShadowed/count": [],
|
||||||
|
"segment/unavailable/count": [
|
||||||
|
"dataSource"
|
||||||
|
],
|
||||||
|
"segment/underReplicated/count": [
|
||||||
|
"dataSource"
|
||||||
|
],
|
||||||
|
"segment/max": [],
|
||||||
|
"segment/used": [
|
||||||
|
"dataSource"
|
||||||
|
],
|
||||||
|
"segment/usedPercent": [
|
||||||
|
"dataSource"
|
||||||
|
],
|
||||||
|
"segment/pendingDelete": [],
|
||||||
|
"jvm/pool/committed": [],
|
||||||
|
"jvm/pool/init": [],
|
||||||
|
"jvm/pool/max": [],
|
||||||
|
"jvm/pool/used": [],
|
||||||
|
"jvm/bufferpool/count": [],
|
||||||
|
"jvm/bufferpool/used": [],
|
||||||
|
"jvm/bufferpool/capacity": [],
|
||||||
|
"jvm/mem/init": [],
|
||||||
|
"jvm/mem/max": [],
|
||||||
|
"jvm/mem/used": [],
|
||||||
|
"jvm/mem/committed": [],
|
||||||
|
"jvm/gc/count": [],
|
||||||
|
"jvm/gc/time": [],
|
||||||
|
"ingest/events/buffered": [
|
||||||
|
"dataSource"
|
||||||
|
],
|
||||||
|
"ingest/bytes/received": [
|
||||||
|
"dataSource"
|
||||||
|
],
|
||||||
|
"sys/swap/free": [],
|
||||||
|
"sys/swap/max": [],
|
||||||
|
"sys/swap/pageIn": [],
|
||||||
|
"sys/swap/pageOut": [],
|
||||||
|
"sys/disk/write/count": [],
|
||||||
|
"sys/disk/read/count": [],
|
||||||
|
"sys/disk/write/size": [],
|
||||||
|
"sys/disk/read/size": [],
|
||||||
|
"sys/net/write/size": [],
|
||||||
|
"sys/net/read/size": [],
|
||||||
|
"sys/fs/used": [],
|
||||||
|
"sys/fs/max": [],
|
||||||
|
"sys/mem/used": [],
|
||||||
|
"sys/mem/max": [],
|
||||||
|
"sys/storage/used": [],
|
||||||
|
"sys/cpu": [],
|
||||||
|
"coordinator-segment/count": [],
|
||||||
|
"historical-segment/count": []
|
||||||
|
}
|
|
@ -0,0 +1,81 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Metamarkets licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.druid.emitter.opentsdb;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import io.druid.java.util.emitter.service.ServiceMetricEvent;
|
||||||
|
import io.druid.java.util.common.DateTimes;
|
||||||
|
import org.joda.time.DateTime;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
|
public class EventConverterTest
|
||||||
|
{
|
||||||
|
private EventConverter converter;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp()
|
||||||
|
{
|
||||||
|
converter = new EventConverter(new ObjectMapper(), null);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSanitize()
|
||||||
|
{
|
||||||
|
String metric = " foo bar/baz";
|
||||||
|
Assert.assertEquals("foo_bar.baz", converter.sanitize(metric));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testConvert() throws Exception
|
||||||
|
{
|
||||||
|
DateTime dateTime = DateTimes.nowUtc();
|
||||||
|
ServiceMetricEvent configuredEvent = new ServiceMetricEvent.Builder()
|
||||||
|
.setDimension("dataSource", "data-source")
|
||||||
|
.setDimension("type", "groupBy")
|
||||||
|
.build(dateTime, "query/time", 10)
|
||||||
|
.build("broker", "brokerHost1");
|
||||||
|
|
||||||
|
Map<String, Object> expectedTags = new HashMap<>();
|
||||||
|
expectedTags.put("service", "broker");
|
||||||
|
expectedTags.put("host", "brokerHost1");
|
||||||
|
expectedTags.put("dataSource", "data-source");
|
||||||
|
expectedTags.put("type", "groupBy");
|
||||||
|
|
||||||
|
OpentsdbEvent opentsdbEvent = converter.convert(configuredEvent);
|
||||||
|
assertEquals("query.time", opentsdbEvent.getMetric());
|
||||||
|
assertEquals(dateTime.getMillis() / 1000L, opentsdbEvent.getTimestamp());
|
||||||
|
assertEquals(10, opentsdbEvent.getValue());
|
||||||
|
assertEquals(expectedTags, opentsdbEvent.getTags());
|
||||||
|
|
||||||
|
ServiceMetricEvent notConfiguredEvent = new ServiceMetricEvent.Builder()
|
||||||
|
.setDimension("dataSource", "data-source")
|
||||||
|
.setDimension("type", "groupBy")
|
||||||
|
.build(dateTime, "foo/bar", 10)
|
||||||
|
.build("broker", "brokerHost1");
|
||||||
|
assertEquals(null, converter.convert(notConfiguredEvent));
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,48 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Metamarkets licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.druid.emitter.opentsdb;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.InjectableValues;
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import io.druid.jackson.DefaultObjectMapper;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class OpentsdbEmitterConfigTest
|
||||||
|
{
|
||||||
|
private ObjectMapper mapper = new DefaultObjectMapper();
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp()
|
||||||
|
{
|
||||||
|
mapper.setInjectableValues(new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper()));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSerDeserOpentsdbEmitterConfig() throws Exception
|
||||||
|
{
|
||||||
|
OpentsdbEmitterConfig opentsdbEmitterConfig = new OpentsdbEmitterConfig("localhost", 9999, 2000, 2000, 200, 2000, null);
|
||||||
|
String opentsdbEmitterConfigString = mapper.writeValueAsString(opentsdbEmitterConfig);
|
||||||
|
OpentsdbEmitterConfig expectedOpentsdbEmitterConfig = mapper.reader(OpentsdbEmitterConfig.class)
|
||||||
|
.readValue(opentsdbEmitterConfigString);
|
||||||
|
Assert.assertEquals(expectedOpentsdbEmitterConfig, opentsdbEmitterConfig);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,54 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Metamarkets licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.druid.emitter.opentsdb;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.InjectableValues;
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import io.druid.jackson.DefaultObjectMapper;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
public class OpentsdbEventTest
|
||||||
|
{
|
||||||
|
private ObjectMapper mapper = new DefaultObjectMapper();
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp()
|
||||||
|
{
|
||||||
|
mapper.setInjectableValues(new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper()));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSerDeserOpentsdbEvent() throws Exception
|
||||||
|
{
|
||||||
|
Map<String, Object> tags = new HashMap<>();
|
||||||
|
tags.put("foo", "bar");
|
||||||
|
tags.put("baz", 1);
|
||||||
|
OpentsdbEvent opentsdbEvent = new OpentsdbEvent("foo.bar", 1000L, 20, tags);
|
||||||
|
String opentsdbString = mapper.writeValueAsString(opentsdbEvent);
|
||||||
|
OpentsdbEvent expectedOpentsdbEvent = mapper.reader(OpentsdbEvent.class)
|
||||||
|
.readValue(opentsdbString);
|
||||||
|
Assert.assertEquals(expectedOpentsdbEvent, opentsdbEvent);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,34 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Metamarkets licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.druid.emitter.opentsdb;
|
||||||
|
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class OpentsdbSenderTest
|
||||||
|
{
|
||||||
|
@Test
|
||||||
|
public void testUrl() throws Exception
|
||||||
|
{
|
||||||
|
OpentsdbSender sender = new OpentsdbSender("localhost", 9999, 2000, 2000, 100, 1000);
|
||||||
|
String expectedUrl = "http://localhost:9999/api/put";
|
||||||
|
Assert.assertEquals(expectedUrl, sender.getWebResource().getURI().toString());
|
||||||
|
}
|
||||||
|
}
|
1
pom.xml
1
pom.xml
|
@ -143,6 +143,7 @@
|
||||||
<module>extensions-contrib/sqlserver-metadata-storage</module>
|
<module>extensions-contrib/sqlserver-metadata-storage</module>
|
||||||
<module>extensions-contrib/kafka-emitter</module>
|
<module>extensions-contrib/kafka-emitter</module>
|
||||||
<module>extensions-contrib/redis-cache</module>
|
<module>extensions-contrib/redis-cache</module>
|
||||||
|
<module>extensions-contrib/opentsdb-emitter</module>
|
||||||
<!-- distribution packaging -->
|
<!-- distribution packaging -->
|
||||||
<module>distribution</module>
|
<module>distribution</module>
|
||||||
</modules>
|
</modules>
|
||||||
|
|
Loading…
Reference in New Issue