mirror of https://github.com/apache/druid.git
commit
a752bc064e
|
@ -150,6 +150,11 @@ The Druid servers emit various metrics and alerts via something we call an Emitt
|
|||
|--------|-----------|-------|
|
||||
|`druid.emitter.composing.emitters`|List of emitter modules to load e.g. ["logging","http"].|[]|
|
||||
|
||||
#### Graphite Emitter
|
||||
|
||||
To use graphite as emitter set `druid.emitter=graphite`. For configuration details please follow this [link](https://github.com/druid-io/druid/tree/master/extensions/graphite-emitter/README.md).
|
||||
|
||||
|
||||
### Metadata Storage
|
||||
|
||||
These properties specify the jdbc connection and other configuration around the metadata storage. The only processes that connect to the metadata storage with these properties are the [Coordinator](../design/coordinator.html), [Indexing service](../design/indexing-service.html) and [Realtime Nodes](../design/realtime.html).
|
||||
|
|
|
@ -0,0 +1,72 @@
|
|||
## introduction
|
||||
|
||||
This extension emits druid metrics to a graphite carbon server.
|
||||
Events are sent after been [pickled](http://graphite.readthedocs.org/en/latest/feeding-carbon.html#the-pickle-protocol); the size of the batch is configurable.
|
||||
|
||||
## configuration
|
||||
|
||||
All the configuration parameters for graphite emitter are under `druid.emitter.graphite`.
|
||||
|
||||
|property|description|required?|default|
|
||||
|--------|-----------|---------|-------|
|
||||
|`druid.emitter.graphite.hostname`|The hostname of the graphite server.|yes|none|
|
||||
|`druid.emitter.graphite.port`|The port of the graphite server.|yes|none|
|
||||
|`druid.emitter.graphite.batchSize`|Number of events to send as one batch.|no|100|
|
||||
|`druid.emitter.graphite.eventConverter`| Filter and converter of druid events to graphite event(please see next section). |yes|none|
|
||||
|`druid.emitter.graphite.flushPeriod` | Queue flushing period in milliseconds. |no|1 minute|
|
||||
|`druid.emitter.graphite.maxQueueSize`| Maximum size of the queue used to buffer events. |no|`MAX_INT`|
|
||||
|`druid.emitter.graphite.alertEmitters`| List of emitters where alerts will be forwarded to. |no| empty list (no forwarding)|
|
||||
|
||||
### Druid to Graphite Event Converter
|
||||
|
||||
Graphite Event Converter defines a mapping between druid metrics name plus dimensions to a Graphite metric path.
|
||||
Graphite metric path is organized using the following schema:
|
||||
`<namespacePrefix>.[<druid service name>].[<druid hostname>].<druid metrics dimensions>.<druid metrics name>`
|
||||
Properly naming the metrics is critical to avoid conflicts, confusing data and potentially wrong interpretation later on.
|
||||
|
||||
Example `druid.historical.hist-host1_yahoo_com:8080.MyDataSourceName.GroupBy.query/time`:
|
||||
|
||||
* `druid` -> namespace prefix
|
||||
* `historical` -> service name
|
||||
* `hist-host1.yahoo.com:8080` -> druid hostname
|
||||
* `MyDataSourceName` -> dimension value
|
||||
* `GroupBy` -> dimension value
|
||||
* `query/time` -> metric name
|
||||
|
||||
We have two different implementation of event converter:
|
||||
|
||||
#### Send-All converter
|
||||
|
||||
The first implementation called `all`, will send all the druid service metrics events.
|
||||
The path will be in the form `<namespacePrefix>.[<druid service name>].[<druid hostname>].<dimensions values ordered by dimension's name>.<metric>`
|
||||
User has control of `<namespacePrefix>.[<druid service name>].[<druid hostname>].`
|
||||
|
||||
You can omit the hostname by setting `ignoreHostname=true`
|
||||
`druid.SERVICE_NAME.dataSourceName.queryType.query.time`
|
||||
|
||||
You can omit the service name by setting `ignoreServiceName=true`
|
||||
`druid.HOSTNAME.dataSourceName.queryType.query.time`
|
||||
|
||||
```json
|
||||
|
||||
druid.emitter.graphite.eventConverter={"type":"all", "namespacePrefix": "druid.test", "ignoreHostname":true, "ignoreServiceName":true}
|
||||
|
||||
```
|
||||
|
||||
#### White-list based converter
|
||||
|
||||
The second implementation called `whiteList`, will send only the white listed metrics and dimensions.
|
||||
Same as for the `all` converter user has control of `<namespacePrefix>.[<druid service name>].[<druid hostname>].`
|
||||
White-list based converter comes with the following default white list map located under resources [defaultWhiteListMap.json](./src/main/resources/defaultWhiteListMap.json)
|
||||
|
||||
Although user can override the default white list map by supplying a property called `mapPath`.
|
||||
This property is a String containing the path for the file containing **white list map Json object**.
|
||||
For example the following converter will read the map from the file `/pathPrefix/fileName.json`.
|
||||
|
||||
```json
|
||||
|
||||
druid.emitter.graphite.eventConverter={"type":"whiteList", "namespacePrefix": "druid.test", "ignoreHostname":true, "ignoreServiceName":true, "mapPath":"/pathPrefix/fileName.json"}
|
||||
|
||||
```
|
||||
|
||||
**Druid emits a huge number of metrics we highly recommend to use the `whiteList` converter**
|
|
@ -0,0 +1,91 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!--
|
||||
~ Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
~ or more contributor license agreements. See the NOTICE file
|
||||
~ distributed with this work for additional information
|
||||
~ regarding copyright ownership. Metamarkets licenses this file
|
||||
~ to you under the Apache License, Version 2.0 (the
|
||||
~ "License"); you may not use this file except in compliance
|
||||
~ with the License. You may obtain a copy of the License at
|
||||
~
|
||||
~ http://www.apache.org/licenses/LICENSE-2.0
|
||||
~
|
||||
~ Unless required by applicable law or agreed to in writing,
|
||||
~ software distributed under the License is distributed on an
|
||||
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
~ KIND, either express or implied. See the License for the
|
||||
~ specific language governing permissions and limitations
|
||||
~ under the License.
|
||||
-->
|
||||
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.9.0-SNAPSHOT</version>
|
||||
<relativePath>../../pom.xml</relativePath>
|
||||
</parent>
|
||||
|
||||
<groupId>io.druid.extensions</groupId>
|
||||
<artifactId>graphite-emitter</artifactId>
|
||||
<name>graphite-emitter</name>
|
||||
<description>Druid emitter extension to support graphite</description>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid-common</artifactId>
|
||||
<version>${project.parent.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid-api</artifactId>
|
||||
<version>${druid.api.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.metamx</groupId>
|
||||
<artifactId>emitter</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.dropwizard.metrics</groupId>
|
||||
<artifactId>metrics-graphite</artifactId>
|
||||
<version>3.1.2</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.easymock</groupId>
|
||||
<artifactId>easymock</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>pl.pragmatists</groupId>
|
||||
<artifactId>JUnitParams</artifactId>
|
||||
<version>1.0.4</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid-server</artifactId>
|
||||
<version>${project.parent.version}</version>
|
||||
<type>test-jar</type>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid-processing</artifactId>
|
||||
<version>${project.parent.version}</version>
|
||||
<type>test-jar</type>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
|
@ -0,0 +1,44 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.emitter.graphite;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonSubTypes;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
||||
import com.metamx.emitter.service.ServiceMetricEvent;
|
||||
|
||||
|
||||
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = WhiteListBasedConverter.class)
|
||||
@JsonSubTypes(value= {
|
||||
@JsonSubTypes.Type(name="all", value = SendAllGraphiteEventConverter.class),
|
||||
@JsonSubTypes.Type(name="whiteList", value = WhiteListBasedConverter.class)
|
||||
})
|
||||
|
||||
public interface DruidToGraphiteEventConverter
|
||||
{
|
||||
/**
|
||||
* This function acts as a filter. It returns <tt>null</tt> if the event is not suppose to be emitted to Graphite
|
||||
* Also This function will define the mapping between the druid event dimension's values and Graphite metric Path
|
||||
*
|
||||
* @param serviceMetricEvent Druid event ot type {@link ServiceMetricEvent}
|
||||
*
|
||||
* @return {@link GraphiteEvent} or <tt>null</tt>
|
||||
*/
|
||||
GraphiteEvent druidEventToGraphite(ServiceMetricEvent serviceMetricEvent);
|
||||
}
|
|
@ -0,0 +1,214 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.emitter.graphite;
|
||||
|
||||
import com.codahale.metrics.graphite.PickledGraphite;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import com.metamx.common.ISE;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import com.metamx.emitter.core.Emitter;
|
||||
import com.metamx.emitter.core.Event;
|
||||
import com.metamx.emitter.service.AlertEvent;
|
||||
import com.metamx.emitter.service.ServiceMetricEvent;
|
||||
|
||||
import javax.validation.constraints.NotNull;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
|
||||
public class GraphiteEmitter implements Emitter
|
||||
{
|
||||
private static Logger log = new Logger(GraphiteEmitter.class);
|
||||
|
||||
private final DruidToGraphiteEventConverter graphiteEventConverter;
|
||||
private final GraphiteEmitterConfig graphiteEmitterConfig;
|
||||
private final List<Emitter> emitterList;
|
||||
private final AtomicBoolean started = new AtomicBoolean(false);
|
||||
private final ObjectMapper mapper;
|
||||
private final LinkedBlockingQueue<GraphiteEvent> eventsQueue;
|
||||
private final static long DEFAULT_PUT_GET_TIMEOUT = 1000; // default wait for put/get operations on the queue 1 sec
|
||||
private static final long FLUSH_TIMEOUT = 60000; // default flush wait 1 min
|
||||
private final ScheduledExecutorService exec = Executors.newScheduledThreadPool(2, new ThreadFactoryBuilder()
|
||||
.setDaemon(true)
|
||||
.setNameFormat("GraphiteEmitter-%s")
|
||||
.build()); // Thread pool of two in order to schedule flush runnable
|
||||
|
||||
public GraphiteEmitter(
|
||||
@NotNull GraphiteEmitterConfig graphiteEmitterConfig,
|
||||
List<Emitter> emitterList, @NotNull ObjectMapper mapper
|
||||
)
|
||||
{
|
||||
this.emitterList = emitterList;
|
||||
this.mapper = mapper;
|
||||
this.graphiteEmitterConfig = graphiteEmitterConfig;
|
||||
this.graphiteEventConverter = graphiteEmitterConfig.getDruidToGraphiteEventConverter();
|
||||
this.eventsQueue = new LinkedBlockingQueue(graphiteEmitterConfig.getMaxQueueSize());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start()
|
||||
{
|
||||
log.info("Starting Graphite Emitter.");
|
||||
synchronized (started) {
|
||||
if (!started.get()) {
|
||||
exec.scheduleAtFixedRate(
|
||||
new ConsumerRunnable(),
|
||||
graphiteEmitterConfig.getFlushPeriod(),
|
||||
graphiteEmitterConfig.getFlushPeriod(),
|
||||
TimeUnit.MILLISECONDS
|
||||
);
|
||||
started.set(true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void emit(Event event)
|
||||
{
|
||||
if (!started.get()) {
|
||||
throw new ISE("WTF emit was called while service is not started yet");
|
||||
}
|
||||
if (event instanceof ServiceMetricEvent) {
|
||||
final GraphiteEvent graphiteEvent = graphiteEventConverter.druidEventToGraphite((ServiceMetricEvent) event);
|
||||
if (graphiteEvent == null) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
final boolean isSuccessful = eventsQueue.offer(graphiteEvent, DEFAULT_PUT_GET_TIMEOUT, TimeUnit.MILLISECONDS);
|
||||
if (!isSuccessful) {
|
||||
log.error(
|
||||
"Throwing event [%s] on the floor Graphite queue is full please increase the capacity or/and the consumer frequency",
|
||||
mapper.writeValueAsString(event)
|
||||
);
|
||||
}
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
log.error(e, "got interrupted with message [%s]", e.getMessage());
|
||||
Thread.currentThread().interrupt();
|
||||
|
||||
}
|
||||
catch (JsonProcessingException e) {
|
||||
log.error(e, e.getMessage());
|
||||
}
|
||||
} else if (!emitterList.isEmpty() && event instanceof AlertEvent) {
|
||||
for (Emitter emitter : emitterList) {
|
||||
emitter.emit(event);
|
||||
}
|
||||
} else {
|
||||
throw new ISE("unknown event type [%s]", event.getClass());
|
||||
}
|
||||
}
|
||||
|
||||
private class ConsumerRunnable implements Runnable
|
||||
{
|
||||
private PickledGraphite pickledGraphite = new PickledGraphite(
|
||||
graphiteEmitterConfig.getHostname(),
|
||||
graphiteEmitterConfig.getPort(),
|
||||
graphiteEmitterConfig.getBatchSize()
|
||||
);
|
||||
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
try {
|
||||
|
||||
if (!pickledGraphite.isConnected()) {
|
||||
log.info("trying to connect to graphite server");
|
||||
pickledGraphite.connect();
|
||||
}
|
||||
while (eventsQueue.size() > 0 && !exec.isShutdown()) {
|
||||
try {
|
||||
final GraphiteEvent graphiteEvent = eventsQueue.poll(DEFAULT_PUT_GET_TIMEOUT, TimeUnit.MILLISECONDS);
|
||||
if (graphiteEvent != null) {
|
||||
log.debug(
|
||||
"sent [%s] with value [%s] and time [%s]",
|
||||
graphiteEvent.getEventPath(),
|
||||
graphiteEvent.getValue(),
|
||||
graphiteEvent.getTimestamp()
|
||||
);
|
||||
pickledGraphite.send(
|
||||
graphiteEvent.getEventPath(),
|
||||
graphiteEvent.getValue(),
|
||||
graphiteEvent.getTimestamp()
|
||||
);
|
||||
}
|
||||
}
|
||||
catch (InterruptedException | IOException e) {
|
||||
log.error(e, e.getMessage());
|
||||
if (e instanceof InterruptedException) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
}
|
||||
pickledGraphite.flush();
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.error(e, e.getMessage());
|
||||
if (e instanceof InterruptedException) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void flush() throws IOException
|
||||
{
|
||||
if (started.get()) {
|
||||
Future future = exec.schedule(new ConsumerRunnable(), 0, TimeUnit.MILLISECONDS);
|
||||
try {
|
||||
future.get(FLUSH_TIMEOUT, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
catch (InterruptedException | ExecutionException | TimeoutException e) {
|
||||
if (e instanceof InterruptedException) {
|
||||
throw new RuntimeException("interrupted flushing elements from queue", e);
|
||||
}
|
||||
log.error(e, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException
|
||||
{
|
||||
flush();
|
||||
started.set(false);
|
||||
exec.shutdown();
|
||||
}
|
||||
|
||||
protected static String sanitize(String namespace)
|
||||
{
|
||||
Pattern DOT_OR_WHITESPACE = Pattern.compile("[\\s]+|[.]+");
|
||||
return DOT_OR_WHITESPACE.matcher(namespace).replaceAll("_");
|
||||
}
|
||||
}
|
|
@ -0,0 +1,170 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.emitter.graphite;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
|
||||
public class GraphiteEmitterConfig
|
||||
{
|
||||
private final static int DEFAULT_BATCH_SIZE = 100;
|
||||
private static final Long DEFAULT_FLUSH_PERIOD = (long) (60 * 1000); // flush every one minute
|
||||
|
||||
@JsonProperty
|
||||
final private String hostname;
|
||||
@JsonProperty
|
||||
final private int port;
|
||||
@JsonProperty
|
||||
final private int batchSize;
|
||||
@JsonProperty
|
||||
final private Long flushPeriod;
|
||||
@JsonProperty
|
||||
final private Integer maxQueueSize;
|
||||
@JsonProperty("eventConverter")
|
||||
final private DruidToGraphiteEventConverter druidToGraphiteEventConverter;
|
||||
|
||||
@JsonProperty
|
||||
final private List<String> alertEmitters;
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (!(o instanceof GraphiteEmitterConfig)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
GraphiteEmitterConfig that = (GraphiteEmitterConfig) o;
|
||||
|
||||
if (getPort() != that.getPort()) {
|
||||
return false;
|
||||
}
|
||||
if (getBatchSize() != that.getBatchSize()) {
|
||||
return false;
|
||||
}
|
||||
if (getHostname() != null ? !getHostname().equals(that.getHostname()) : that.getHostname() != null) {
|
||||
return false;
|
||||
}
|
||||
if (getFlushPeriod() != null ? !getFlushPeriod().equals(that.getFlushPeriod()) : that.getFlushPeriod() != null) {
|
||||
return false;
|
||||
}
|
||||
if (getMaxQueueSize() != null
|
||||
? !getMaxQueueSize().equals(that.getMaxQueueSize())
|
||||
: that.getMaxQueueSize() != null) {
|
||||
return false;
|
||||
}
|
||||
if (getDruidToGraphiteEventConverter() != null
|
||||
? !getDruidToGraphiteEventConverter().equals(that.getDruidToGraphiteEventConverter())
|
||||
: that.getDruidToGraphiteEventConverter() != null) {
|
||||
return false;
|
||||
}
|
||||
return !(getAlertEmitters() != null
|
||||
? !getAlertEmitters().equals(that.getAlertEmitters())
|
||||
: that.getAlertEmitters() != null);
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
int result = getHostname() != null ? getHostname().hashCode() : 0;
|
||||
result = 31 * result + getPort();
|
||||
result = 31 * result + getBatchSize();
|
||||
result = 31 * result + (getFlushPeriod() != null ? getFlushPeriod().hashCode() : 0);
|
||||
result = 31 * result + (getMaxQueueSize() != null ? getMaxQueueSize().hashCode() : 0);
|
||||
result = 31 * result + (getDruidToGraphiteEventConverter() != null
|
||||
? getDruidToGraphiteEventConverter().hashCode()
|
||||
: 0);
|
||||
result = 31 * result + (getAlertEmitters() != null ? getAlertEmitters().hashCode() : 0);
|
||||
return result;
|
||||
}
|
||||
|
||||
@JsonCreator
|
||||
public GraphiteEmitterConfig(
|
||||
@JsonProperty("hostname") String hostname,
|
||||
@JsonProperty("port") Integer port,
|
||||
@JsonProperty("batchSize") Integer batchSize,
|
||||
@JsonProperty("flushPeriod") Long flushPeriod,
|
||||
@JsonProperty("maxQueueSize") Integer maxQueueSize,
|
||||
@JsonProperty("eventConverter") DruidToGraphiteEventConverter druidToGraphiteEventConverter,
|
||||
@JsonProperty("alertEmitters") List<String> alertEmitters
|
||||
)
|
||||
{
|
||||
this.alertEmitters = alertEmitters == null ? Collections.<String>emptyList() : alertEmitters;
|
||||
this.druidToGraphiteEventConverter = Preconditions.checkNotNull(
|
||||
druidToGraphiteEventConverter,
|
||||
"Event converter can not ne null dude"
|
||||
);
|
||||
this.flushPeriod = flushPeriod == null ? DEFAULT_FLUSH_PERIOD : flushPeriod;
|
||||
this.maxQueueSize = maxQueueSize == null ? Integer.MAX_VALUE : maxQueueSize;
|
||||
this.hostname = Preconditions.checkNotNull(hostname, "hostname can not be null");
|
||||
this.port = Preconditions.checkNotNull(port, "port can not be null");
|
||||
this.batchSize = (batchSize == null) ? DEFAULT_BATCH_SIZE : batchSize;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public String getHostname()
|
||||
{
|
||||
return hostname;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public int getPort()
|
||||
{
|
||||
return port;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public int getBatchSize()
|
||||
{
|
||||
return batchSize;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public Integer getMaxQueueSize()
|
||||
{
|
||||
return maxQueueSize;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public Long getFlushPeriod()
|
||||
{
|
||||
return flushPeriod;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public DruidToGraphiteEventConverter getDruidToGraphiteEventConverter()
|
||||
{
|
||||
return druidToGraphiteEventConverter;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public List<String> getAlertEmitters()
|
||||
{
|
||||
return alertEmitters;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,72 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.emitter.graphite;
|
||||
|
||||
import com.fasterxml.jackson.databind.Module;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.inject.Binder;
|
||||
import com.google.inject.Injector;
|
||||
import com.google.inject.Key;
|
||||
import com.google.inject.Provides;
|
||||
import com.google.inject.name.Named;
|
||||
import com.google.inject.name.Names;
|
||||
import com.metamx.emitter.core.Emitter;
|
||||
import io.druid.guice.JsonConfigProvider;
|
||||
import io.druid.guice.ManageLifecycle;
|
||||
import io.druid.initialization.DruidModule;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
public class GraphiteEmitterModule implements DruidModule
|
||||
{
|
||||
private static final String EMITTER_TYPE = "graphite";
|
||||
@Override
|
||||
public List<? extends Module> getJacksonModules()
|
||||
{
|
||||
return Collections.EMPTY_LIST;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void configure(Binder binder)
|
||||
{
|
||||
JsonConfigProvider.bind(binder, "druid.emitter." + EMITTER_TYPE, GraphiteEmitterConfig.class);
|
||||
}
|
||||
|
||||
@Provides
|
||||
@ManageLifecycle
|
||||
@Named(EMITTER_TYPE)
|
||||
public Emitter getEmitter(GraphiteEmitterConfig graphiteEmitterConfig, ObjectMapper mapper, final Injector injector){
|
||||
List<Emitter> emitters = Lists.transform(
|
||||
graphiteEmitterConfig.getAlertEmitters(),
|
||||
new Function<String, Emitter>()
|
||||
{
|
||||
@Override
|
||||
public Emitter apply(String s)
|
||||
{
|
||||
return injector.getInstance(Key.get(Emitter.class, Names.named(s)));
|
||||
}
|
||||
}
|
||||
);
|
||||
return new GraphiteEmitter(graphiteEmitterConfig, emitters, mapper);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,60 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.emitter.graphite;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
import javax.validation.constraints.NotNull;
|
||||
|
||||
public class GraphiteEvent
|
||||
{
|
||||
private final String eventPath;
|
||||
private final String value;
|
||||
private final long timestamp;
|
||||
|
||||
/**
|
||||
* A graphite event must be in the following format: <metric path> <metric value> <metric timestamp>
|
||||
* ex: PRODUCTION.host.graphite-tutorial.responseTime.p95 0.10 1400509112
|
||||
* @param eventPath This is the namespace path of the metric
|
||||
* @param value value of the metric
|
||||
* @param timestamp unix time in second
|
||||
*/
|
||||
GraphiteEvent(@NotNull String eventPath, @NotNull String value, @NotNull Long timestamp)
|
||||
{
|
||||
this.eventPath = Preconditions.checkNotNull(eventPath, "path can not be null");
|
||||
this.value = Preconditions.checkNotNull(value, "value can not be null");
|
||||
this.timestamp = Preconditions.checkNotNull(timestamp, "timestamp can not be null");
|
||||
}
|
||||
|
||||
public String getEventPath()
|
||||
{
|
||||
return eventPath;
|
||||
}
|
||||
|
||||
public String getValue()
|
||||
{
|
||||
return value;
|
||||
}
|
||||
|
||||
public long getTimestamp()
|
||||
{
|
||||
return timestamp;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,142 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.emitter.graphite;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeName;
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableSortedSet;
|
||||
import com.metamx.emitter.service.ServiceMetricEvent;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* Emits all the events instance of {@link com.metamx.emitter.service.ServiceMetricEvent}.
|
||||
* <p>
|
||||
* All the dimensions will be retained and lexicographically order using dimensions name.
|
||||
* <p>
|
||||
* The metric path of the graphite event is:
|
||||
* <namespacePrefix>.[<druid service name>].[<druid hostname>].<dimensions values ordered by dimension's name>.<metric>
|
||||
* <p>
|
||||
* Note that this path will be sanitized by replacing all the `.` or `space` to `_` {@link GraphiteEmitter#sanitize(String)}
|
||||
*/
|
||||
|
||||
@JsonTypeName("all")
|
||||
public class SendAllGraphiteEventConverter implements DruidToGraphiteEventConverter
|
||||
{
|
||||
@JsonProperty
|
||||
private final boolean ignoreHostname;
|
||||
|
||||
@JsonProperty
|
||||
private final boolean ignoreServiceName;
|
||||
|
||||
@JsonProperty
|
||||
private final String namespacePrefix;
|
||||
|
||||
@JsonProperty
|
||||
public String getNamespacePrefix()
|
||||
{
|
||||
return namespacePrefix;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public boolean isIgnoreServiceName()
|
||||
{
|
||||
return ignoreServiceName;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public boolean isIgnoreHostname()
|
||||
{
|
||||
return ignoreHostname;
|
||||
}
|
||||
|
||||
@JsonCreator
|
||||
public SendAllGraphiteEventConverter(
|
||||
@JsonProperty("namespacePrefix") String namespacePrefix,
|
||||
@JsonProperty("ignoreHostname") Boolean ignoreHostname,
|
||||
@JsonProperty("ignoreServiceName") Boolean ignoreServiceName
|
||||
)
|
||||
{
|
||||
this.ignoreHostname = ignoreHostname == null ? false : ignoreHostname;
|
||||
this.ignoreServiceName = ignoreServiceName == null ? false : ignoreServiceName;
|
||||
this.namespacePrefix = Preconditions.checkNotNull(namespacePrefix, "namespace prefix can not be null");
|
||||
}
|
||||
|
||||
@Override
|
||||
public GraphiteEvent druidEventToGraphite(ServiceMetricEvent serviceMetricEvent)
|
||||
{
|
||||
ImmutableList.Builder metricPathBuilder = new ImmutableList.Builder<String>();
|
||||
metricPathBuilder.add(this.getNamespacePrefix());
|
||||
if (!this.isIgnoreServiceName()) {
|
||||
metricPathBuilder.add(GraphiteEmitter.sanitize(serviceMetricEvent.getService()));
|
||||
}
|
||||
if (!this.isIgnoreHostname()) {
|
||||
metricPathBuilder.add(GraphiteEmitter.sanitize(serviceMetricEvent.getHost()));
|
||||
}
|
||||
|
||||
ImmutableSortedSet<String> dimNames = ImmutableSortedSet.copyOf(serviceMetricEvent.getUserDims().keySet());
|
||||
for (String dimName : dimNames) {
|
||||
metricPathBuilder.add(GraphiteEmitter.sanitize(String.valueOf(serviceMetricEvent.getUserDims()
|
||||
.get(dimName))));
|
||||
}
|
||||
metricPathBuilder.add(GraphiteEmitter.sanitize(serviceMetricEvent.getMetric()));
|
||||
|
||||
return new GraphiteEvent(
|
||||
Joiner.on(".").join(metricPathBuilder.build()),
|
||||
serviceMetricEvent.getValue().toString(),
|
||||
TimeUnit.MILLISECONDS.toSeconds(serviceMetricEvent.getCreatedTime().getMillis())
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (!(o instanceof SendAllGraphiteEventConverter)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
SendAllGraphiteEventConverter that = (SendAllGraphiteEventConverter) o;
|
||||
|
||||
if (isIgnoreHostname() != that.isIgnoreHostname()) {
|
||||
return false;
|
||||
}
|
||||
if (isIgnoreServiceName() != that.isIgnoreServiceName()) {
|
||||
return false;
|
||||
}
|
||||
return getNamespacePrefix().equals(that.getNamespacePrefix());
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
int result = (isIgnoreHostname() ? 1 : 0);
|
||||
result = 31 * result + (isIgnoreServiceName() ? 1 : 0);
|
||||
result = 31 * result + getNamespacePrefix().hashCode();
|
||||
return result;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,265 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.emitter.graphite;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JacksonInject;
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeName;
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.common.collect.ImmutableSortedMap;
|
||||
import com.google.common.io.Files;
|
||||
import com.metamx.common.ISE;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import com.metamx.emitter.service.ServiceMetricEvent;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.Charset;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.SortedMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@JsonTypeName("whiteList")
|
||||
public class WhiteListBasedConverter implements DruidToGraphiteEventConverter
|
||||
{
|
||||
private static final Logger LOGGER = new Logger(WhiteListBasedConverter.class);
|
||||
/**
|
||||
* @code whiteListDimsMapper is a white list of metric->dimensions mappings.
|
||||
* Key is the metric name or the metric's prefix.
|
||||
* Value is a list of metric's dimensions names.
|
||||
* The order of the dimension name is important, it will be used to build the graphite metric path.
|
||||
* For instance we have dimension type is nested under dimension dataSource -> prefix.dataSource.queryType.metricName
|
||||
*/
|
||||
private final ImmutableSortedMap<String, ImmutableSet<String>> whiteListDimsMapper;
|
||||
|
||||
@JsonProperty
|
||||
private final boolean ignoreHostname;
|
||||
|
||||
@JsonProperty
|
||||
private final boolean ignoreServiceName;
|
||||
|
||||
@JsonProperty
|
||||
private final String namespacePrefix;
|
||||
|
||||
@JsonProperty
|
||||
private final String mapPath;
|
||||
|
||||
private final ObjectMapper mapper;
|
||||
|
||||
@JsonCreator
|
||||
public WhiteListBasedConverter(
|
||||
@JsonProperty("namespacePrefix") String namespacePrefix,
|
||||
@JsonProperty("ignoreHostname") Boolean ignoreHostname,
|
||||
@JsonProperty("ignoreServiceName") Boolean ignoreServiceName,
|
||||
@JsonProperty("mapPath") String mapPath,
|
||||
@JacksonInject ObjectMapper mapper
|
||||
)
|
||||
{
|
||||
this.mapper = Preconditions.checkNotNull(mapper);
|
||||
this.mapPath = mapPath;
|
||||
this.whiteListDimsMapper = readMap(this.mapPath);
|
||||
this.ignoreHostname = ignoreHostname == null ? false : ignoreHostname;
|
||||
this.ignoreServiceName = ignoreServiceName == null ? false : ignoreServiceName;
|
||||
this.namespacePrefix = Preconditions.checkNotNull(namespacePrefix, "namespace prefix can not be null");
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public boolean isIgnoreHostname()
|
||||
{
|
||||
return ignoreHostname;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public boolean isIgnoreServiceName()
|
||||
{
|
||||
return ignoreServiceName;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public String getNamespacePrefix()
|
||||
{
|
||||
return namespacePrefix;
|
||||
}
|
||||
|
||||
public ImmutableSortedMap<String, ImmutableSet<String>> getWhiteListDimsMapper()
|
||||
{
|
||||
return whiteListDimsMapper;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param event Event subject to filtering
|
||||
*
|
||||
* @return true if and only if the event prefix key is in the {@code whiteListDimsMapper}
|
||||
*/
|
||||
private boolean isInWhiteList(ServiceMetricEvent event)
|
||||
{
|
||||
return getPrefixKey(event.getMetric(), whiteListDimsMapper) != null;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param key the metric name to lookup
|
||||
* @param whiteList
|
||||
*
|
||||
* @return <tt>null</tt> if the key does not match with any of the prefixes keys in @code metricsWhiteList,
|
||||
* or the prefix in @code whiteListDimsMapper
|
||||
*/
|
||||
private String getPrefixKey(String key, SortedMap<String, ?> whiteList)
|
||||
{
|
||||
String prefixKey = null;
|
||||
if (whiteList.containsKey(key)) {
|
||||
return key;
|
||||
}
|
||||
SortedMap<String, ?> headMap = whiteList.headMap(key);
|
||||
if (!headMap.isEmpty() && key.startsWith(headMap.lastKey())) {
|
||||
prefixKey = headMap.lastKey();
|
||||
}
|
||||
return prefixKey;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a {@link List} of the white-listed dimension's values to send.
|
||||
* The list is order is the same as the order of dimensions {@code whiteListDimsMapper}
|
||||
*
|
||||
* @param event the event for which will filter dimensions
|
||||
*
|
||||
* @return {@link List} of the filtered dimension values to send or <tt>null<tt/> if the event is not in the white list
|
||||
*/
|
||||
private List<String> getOrderedDimValues(ServiceMetricEvent event)
|
||||
{
|
||||
String prefixKey = getPrefixKey(event.getMetric(), whiteListDimsMapper);
|
||||
if (prefixKey == null) {
|
||||
return null;
|
||||
}
|
||||
ImmutableList.Builder<String> outputList = new ImmutableList.Builder();
|
||||
Set<String> dimensions = whiteListDimsMapper.get(prefixKey);
|
||||
if (dimensions == null) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
for (String dimKey : dimensions) {
|
||||
String dimValue = (String) event.getUserDims().get(dimKey);
|
||||
if (dimValue != null) {
|
||||
outputList.add(GraphiteEmitter.sanitize(dimValue));
|
||||
}
|
||||
}
|
||||
return outputList.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* @param serviceMetricEvent druid metric event to convert
|
||||
*
|
||||
* @return <tt>null</tt> if the event is not white listed, otherwise return {@link GraphiteEvent}
|
||||
* <p>
|
||||
* The metric path of the graphite event is:
|
||||
* <namespacePrefix>.[<druid service name>].[<druid hostname>].<white-listed dimensions>.<metric>
|
||||
* <p/>
|
||||
* The order of the dimension is the order returned by {@code getOrderedDimValues()}
|
||||
* Note that this path will be sanitized by replacing all the `.` or space by `_` {@link GraphiteEmitter#sanitize(String)}
|
||||
* </p>
|
||||
*/
|
||||
|
||||
@Override
|
||||
public GraphiteEvent druidEventToGraphite(ServiceMetricEvent serviceMetricEvent)
|
||||
{
|
||||
if (!this.isInWhiteList(serviceMetricEvent)) {
|
||||
return null;
|
||||
}
|
||||
final ImmutableList.Builder<String> metricPathBuilder = new ImmutableList.Builder<>();
|
||||
metricPathBuilder.add(this.getNamespacePrefix());
|
||||
if (!this.isIgnoreServiceName()) {
|
||||
metricPathBuilder.add(GraphiteEmitter.sanitize(serviceMetricEvent.getService()));
|
||||
}
|
||||
if (!this.isIgnoreHostname()) {
|
||||
metricPathBuilder.add(GraphiteEmitter.sanitize(serviceMetricEvent.getHost()));
|
||||
}
|
||||
metricPathBuilder.addAll(this.getOrderedDimValues(serviceMetricEvent));
|
||||
metricPathBuilder.add(GraphiteEmitter.sanitize(serviceMetricEvent.getMetric()));
|
||||
|
||||
final GraphiteEvent graphiteEvent = new GraphiteEvent(
|
||||
Joiner.on(".").join(metricPathBuilder.build()),
|
||||
String.valueOf(serviceMetricEvent.getValue()),
|
||||
TimeUnit.MILLISECONDS.toSeconds(serviceMetricEvent.getCreatedTime().getMillis())
|
||||
);
|
||||
return graphiteEvent;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (!(o instanceof WhiteListBasedConverter)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
WhiteListBasedConverter that = (WhiteListBasedConverter) o;
|
||||
|
||||
if (isIgnoreHostname() != that.isIgnoreHostname()) {
|
||||
return false;
|
||||
}
|
||||
if (isIgnoreServiceName() != that.isIgnoreServiceName()) {
|
||||
return false;
|
||||
}
|
||||
if (!getNamespacePrefix().equals(that.getNamespacePrefix())) {
|
||||
return false;
|
||||
}
|
||||
return mapPath != null ? mapPath.equals(that.mapPath) : that.mapPath == null;
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
int result = (isIgnoreHostname() ? 1 : 0);
|
||||
result = 31 * result + (isIgnoreServiceName() ? 1 : 0);
|
||||
result = 31 * result + getNamespacePrefix().hashCode();
|
||||
result = 31 * result + (mapPath != null ? mapPath.hashCode() : 0);
|
||||
return result;
|
||||
}
|
||||
|
||||
private ImmutableSortedMap<String, ImmutableSet<String>> readMap(final String mapPath)
|
||||
{
|
||||
String fileContent;
|
||||
String actualPath = mapPath;
|
||||
if (Strings.isNullOrEmpty(mapPath)) {
|
||||
actualPath = this.getClass().getClassLoader().getResource("defaultWhiteListMap.json").getFile();
|
||||
LOGGER.warn("using default whiteList map located at [%s]", actualPath);
|
||||
}
|
||||
|
||||
try {
|
||||
fileContent = Files.asCharSource(new File(actualPath), Charset.forName("UTF-8")).read();
|
||||
return mapper.reader(new TypeReference<ImmutableSortedMap<String, ImmutableSet<String>>>()
|
||||
{
|
||||
}).readValue(fileContent);
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw new ISE(e, "Got an exception while parsing file [%s]", actualPath);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1 @@
|
|||
io.druid.emitter.graphite.GraphiteEmitterModule
|
|
@ -0,0 +1,48 @@
|
|||
{
|
||||
"ingest/events": [],
|
||||
"ingest/handoff/failed": [],
|
||||
"ingest/persists": [],
|
||||
"ingest/rows/output": [],
|
||||
"jvm/gc": [],
|
||||
"jvm/mem": [],
|
||||
"query/cpu/time": [
|
||||
"dataSource",
|
||||
"type"
|
||||
],
|
||||
"query/node/time": [
|
||||
"dataSource",
|
||||
"type"
|
||||
],
|
||||
"query/node/ttfb": [
|
||||
"dataSource",
|
||||
"type"
|
||||
],
|
||||
"query/partial/time": [
|
||||
"dataSource",
|
||||
"type"
|
||||
],
|
||||
"query/segment/time": [
|
||||
"dataSource",
|
||||
"type"
|
||||
],
|
||||
"query/segmentAndCache/time": [
|
||||
"dataSource",
|
||||
"type"
|
||||
],
|
||||
"query/time": [
|
||||
"dataSource",
|
||||
"type"
|
||||
],
|
||||
"query/wait/time": [
|
||||
"dataSource",
|
||||
"type"
|
||||
],
|
||||
"segment/count": [],
|
||||
"segment/dropQueue/count": [],
|
||||
"segment/loadQueue/count": [],
|
||||
"segment/loadQueue/failed": [],
|
||||
"segment/loadQueue/size": [],
|
||||
"segment/scan/pending": [],
|
||||
"segment/size": [],
|
||||
"segment/usedPercent": []
|
||||
}
|
|
@ -0,0 +1,34 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.emitter.graphite;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
public class DruidToWhiteListBasedConverterTest
|
||||
{
|
||||
|
||||
@Test
|
||||
public void testSanitize()
|
||||
{
|
||||
String test = "host name.yahoo.com:8080";
|
||||
Assert.assertEquals("host_name_yahoo_com:8080", GraphiteEmitter.sanitize(test));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,66 @@
|
|||
package io.druid.emitter.graphite;
|
||||
|
||||
import com.fasterxml.jackson.databind.InjectableValues;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import io.druid.jackson.DefaultObjectMapper;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
|
||||
public class GraphiteEmitterConfigTest
|
||||
{
|
||||
private ObjectMapper mapper = new DefaultObjectMapper();
|
||||
|
||||
@Before
|
||||
public void setUp()
|
||||
{
|
||||
mapper.setInjectableValues(new InjectableValues.Std().addValue(
|
||||
ObjectMapper.class,
|
||||
new DefaultObjectMapper()
|
||||
));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSerDeserGraphiteEmitterConfig() throws IOException
|
||||
{
|
||||
GraphiteEmitterConfig graphiteEmitterConfig = new GraphiteEmitterConfig(
|
||||
"hostname",
|
||||
8080,
|
||||
1000,
|
||||
1000L,
|
||||
100,
|
||||
new SendAllGraphiteEventConverter("prefix", true, true),
|
||||
Collections.EMPTY_LIST
|
||||
);
|
||||
String graphiteEmitterConfigString = mapper.writeValueAsString(graphiteEmitterConfig);
|
||||
GraphiteEmitterConfig graphiteEmitterConfigExpected = mapper.reader(GraphiteEmitterConfig.class).readValue(
|
||||
graphiteEmitterConfigString
|
||||
);
|
||||
Assert.assertEquals(graphiteEmitterConfigExpected, graphiteEmitterConfig);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSerDeserDruidToGraphiteEventConverter() throws IOException
|
||||
{
|
||||
SendAllGraphiteEventConverter sendAllGraphiteEventConverter = new SendAllGraphiteEventConverter("prefix", true, true);
|
||||
String noopGraphiteEventConverterString = mapper.writeValueAsString(sendAllGraphiteEventConverter);
|
||||
DruidToGraphiteEventConverter druidToGraphiteEventConverter = mapper.reader(DruidToGraphiteEventConverter.class)
|
||||
.readValue(noopGraphiteEventConverterString);
|
||||
Assert.assertEquals(druidToGraphiteEventConverter, sendAllGraphiteEventConverter);
|
||||
|
||||
WhiteListBasedConverter whiteListBasedConverter = new WhiteListBasedConverter(
|
||||
"prefix",
|
||||
true,
|
||||
true,
|
||||
"",
|
||||
new DefaultObjectMapper()
|
||||
);
|
||||
String whiteListBasedConverterString = mapper.writeValueAsString(whiteListBasedConverter);
|
||||
druidToGraphiteEventConverter = mapper.reader(DruidToGraphiteEventConverter.class)
|
||||
.readValue(whiteListBasedConverterString);
|
||||
Assert.assertEquals(druidToGraphiteEventConverter, whiteListBasedConverter);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,140 @@
|
|||
package io.druid.emitter.graphite;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
import com.metamx.emitter.service.ServiceMetricEvent;
|
||||
import io.druid.jackson.DefaultObjectMapper;
|
||||
import junitparams.JUnitParamsRunner;
|
||||
import junitparams.Parameters;
|
||||
import org.easymock.EasyMock;
|
||||
import org.joda.time.DateTime;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
|
||||
|
||||
@RunWith(JUnitParamsRunner.class)
|
||||
public class WhiteListBasedConverterTest
|
||||
{
|
||||
final private String prefix = "druid";
|
||||
final private WhiteListBasedConverter defaultWhiteListBasedConverter = new WhiteListBasedConverter(
|
||||
prefix,
|
||||
false,
|
||||
false,
|
||||
null,
|
||||
new DefaultObjectMapper()
|
||||
);
|
||||
private ServiceMetricEvent event;
|
||||
private DateTime createdTime = new DateTime();
|
||||
private String hostname = "testHost.yahoo.com:8080";
|
||||
private String serviceName = "historical";
|
||||
private String defaultNamespace = prefix + "." + serviceName + "." + GraphiteEmitter.sanitize(hostname);
|
||||
|
||||
@Before
|
||||
public void setUp()
|
||||
{
|
||||
event = EasyMock.createMock(ServiceMetricEvent.class);
|
||||
EasyMock.expect(event.getHost()).andReturn(hostname).anyTimes();
|
||||
EasyMock.expect(event.getService()).andReturn(serviceName).anyTimes();
|
||||
EasyMock.expect(event.getCreatedTime()).andReturn(createdTime).anyTimes();
|
||||
EasyMock.expect(event.getUserDims()).andReturn(Maps.<String, Object>newHashMap()).anyTimes();
|
||||
EasyMock.expect(event.getValue()).andReturn(10).anyTimes();
|
||||
}
|
||||
|
||||
@Test
|
||||
@Parameters(
|
||||
{
|
||||
"query/time, true",
|
||||
"query/node/ttfb, true",
|
||||
"query/segmentAndCache/time, true",
|
||||
"query/intervalChunk/time, false",
|
||||
"query/time/balaba, true",
|
||||
"query/tim, false",
|
||||
"segment/added/bytes, false",
|
||||
"segment/count, true",
|
||||
"segment/size, true",
|
||||
"segment/cost/raw, false",
|
||||
"coordinator/TIER_1 /cost/raw, false",
|
||||
"segment/Kost/raw, false",
|
||||
", false",
|
||||
"word, false",
|
||||
"coordinator, false",
|
||||
"server/, false",
|
||||
"ingest/persists/time, true",
|
||||
"jvm/mem/init, true",
|
||||
"jvm/gc/count, true"
|
||||
}
|
||||
)
|
||||
public void testDefaultIsInWhiteList(String key, boolean expectedValue)
|
||||
{
|
||||
EasyMock.expect(event.getMetric()).andReturn(key).anyTimes();
|
||||
EasyMock.replay(event);
|
||||
boolean isIn = defaultWhiteListBasedConverter.druidEventToGraphite(event) != null;
|
||||
Assert.assertEquals(expectedValue, isIn);
|
||||
}
|
||||
|
||||
@Test
|
||||
@Parameters
|
||||
public void testGetPath(ServiceMetricEvent serviceMetricEvent, String expectedPath)
|
||||
{
|
||||
GraphiteEvent graphiteEvent = defaultWhiteListBasedConverter.druidEventToGraphite(serviceMetricEvent);
|
||||
String path = null;
|
||||
if (graphiteEvent != null) {
|
||||
path = graphiteEvent.getEventPath();
|
||||
}
|
||||
Assert.assertEquals(expectedPath, path);
|
||||
}
|
||||
|
||||
private Object[] parametersForTestGetPath()
|
||||
{
|
||||
return new Object[]{
|
||||
new Object[]{
|
||||
new ServiceMetricEvent.Builder().setDimension("id", "dummy_id")
|
||||
.setDimension("status", "some_status")
|
||||
.setDimension("numDimensions", "1")
|
||||
.setDimension("segment", "dummy_segment")
|
||||
.build(createdTime, "query/segment/time/balabla/more", 10)
|
||||
.build(serviceName, hostname),
|
||||
defaultNamespace + ".query/segment/time/balabla/more"
|
||||
},
|
||||
new Object[]{
|
||||
new ServiceMetricEvent.Builder().setDimension("dataSource", "some_data_source")
|
||||
.setDimension("tier", "_default_tier")
|
||||
.build(createdTime, "segment/max", 10)
|
||||
.build(serviceName, hostname),
|
||||
null
|
||||
},
|
||||
new Object[]{
|
||||
new ServiceMetricEvent.Builder().setDimension("dataSource", "data-source")
|
||||
.setDimension("type", "groupBy")
|
||||
.setDimension("interval", "2013/2015")
|
||||
.setDimension("some_random_dim1", "random_dim_value1")
|
||||
.setDimension("some_random_dim2", "random_dim_value2")
|
||||
.setDimension("hasFilters", "no")
|
||||
.setDimension("duration", "P1D")
|
||||
.setDimension("remoteAddress", "194.0.90.2")
|
||||
.setDimension("id", "ID")
|
||||
.setDimension("context", "{context}")
|
||||
.build(createdTime, "query/time", 10)
|
||||
.build(serviceName, hostname),
|
||||
defaultNamespace + ".data-source.groupBy.query/time"
|
||||
},
|
||||
new Object[]{
|
||||
new ServiceMetricEvent.Builder().setDimension("dataSource", "data-source")
|
||||
.setDimension("type", "groupBy")
|
||||
.setDimension("some_random_dim1", "random_dim_value1")
|
||||
.build(createdTime, "ingest/persists/count", 10)
|
||||
.build(serviceName, hostname),
|
||||
defaultNamespace + ".ingest/persists/count"
|
||||
},
|
||||
new Object[]{
|
||||
new ServiceMetricEvent.Builder().setDimension("bufferPoolName", "BufferPool")
|
||||
.setDimension("type", "groupBy")
|
||||
.setDimension("some_random_dim1", "random_dim_value1")
|
||||
.build(createdTime, "jvm/bufferpool/capacity", 10)
|
||||
.build(serviceName, hostname),
|
||||
null
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
4
pom.xml
4
pom.xml
|
@ -92,6 +92,10 @@
|
|||
<module>extensions/cassandra-storage</module>
|
||||
<module>extensions/hdfs-storage</module>
|
||||
<module>extensions/s3-extensions</module>
|
||||
<module>extensions/graphite-emitter</module>
|
||||
<!-- kafka-seven extension is not built by default since
|
||||
Kafka 0.7 is not available in Maven Central -->
|
||||
<!-- <module>extensions/kafka-seven</module> -->
|
||||
<module>extensions/kafka-eight</module>
|
||||
<module>extensions/kafka-eight-simpleConsumer</module>
|
||||
<module>extensions/rabbitmq</module>
|
||||
|
|
Loading…
Reference in New Issue