Graphite emitter - add plaintext protocol (#4265)

* Graphite emitter - add plaintext protocol. Configurable option of replacing slash to dot in metric name.

* Graphite emitter - fix misspelling in docs.

* Graphite emitter - extend docs.

* Graphite emitter - fix code style.
This commit is contained in:
Bartosz Ługowski 2017-08-29 15:23:06 +02:00 committed by Slim
parent beecb9e210
commit 8dddccc687
8 changed files with 115 additions and 24 deletions

View File

@ -9,7 +9,8 @@ To use this extension, make sure to [include](../../operations/including-extensi
## Introduction
This extension emits druid metrics to a graphite carbon server.
Events are sent after been [pickled](http://graphite.readthedocs.org/en/latest/feeding-carbon.html#the-pickle-protocol); the size of the batch is configurable.
Metrics can be sent by using [plaintext](http://graphite.readthedocs.io/en/latest/feeding-carbon.html#the-plaintext-protocol) or [pickle](http://graphite.readthedocs.io/en/latest/feeding-carbon.html#the-pickle-protocol) protocol.
The pickle protocol is more efficient and supports sending batches of metrics (plaintext protocol send only one metric) in one request; batch size is configurable.
## Configuration
@ -19,8 +20,9 @@ All the configuration parameters for graphite emitter are under `druid.emitter.g
|--------|-----------|---------|-------|
|`druid.emitter.graphite.hostname`|The hostname of the graphite server.|yes|none|
|`druid.emitter.graphite.port`|The port of the graphite server.|yes|none|
|`druid.emitter.graphite.batchSize`|Number of events to send as one batch.|no|100|
|`druid.emitter.graphite.eventConverter`| Filter and converter of druid events to graphite event(please see next section). |yes|none|
|`druid.emitter.graphite.batchSize`|Number of events to send as one batch (only for pickle protocol)|no|100|
|`druid.emitter.graphite.protocol`|Graphite protocol; available protocols: pickle, plaintext.|no|pickle|
|`druid.emitter.graphite.eventConverter`| Filter and converter of druid events to graphite event (please see next section).|yes|none|
|`druid.emitter.graphite.flushPeriod` | Queue flushing period in milliseconds. |no|1 minute|
|`druid.emitter.graphite.maxQueueSize`| Maximum size of the queue used to buffer events. |no|`MAX_INT`|
|`druid.emitter.graphite.alertEmitters`| List of emitters where alerts will be forwarded to. |no| empty list (no forwarding)|
@ -52,10 +54,15 @@ The path will be in the form `<namespacePrefix>.[<druid service name>].[<druid h
User has control of `<namespacePrefix>.[<druid service name>].[<druid hostname>].`
You can omit the hostname by setting `ignoreHostname=true`
`druid.SERVICE_NAME.dataSourceName.queryType.query.time`
`druid.SERVICE_NAME.dataSourceName.queryType.query/time`
You can omit the service name by setting `ignoreServiceName=true`
`druid.HOSTNAME.dataSourceName.queryType.query.time`
`druid.HOSTNAME.dataSourceName.queryType.query/time`
Elements in metric name by default are separated by "/", so graphite will create all metrics on one level. If you want to have metrics in the tree structure, you have to set `replaceSlashWithDot=true`
Original: `druid.HOSTNAME.dataSourceName.queryType.query/time`
Changed: `druid.HOSTNAME.dataSourceName.queryType.query.time`
```json
@ -70,7 +77,7 @@ Same as for the `all` converter user has control of `<namespacePrefix>.[<druid s
White-list based converter comes with the following default white list map located under resources in `./src/main/resources/defaultWhiteListMap.json`
Although user can override the default white list map by supplying a property called `mapPath`.
This property is a String containing the path for the file containing **white list map Json object**.
This property is a String containing the path for the file containing **white list map Json object**.
For example the following converter will read the map from the file `/pathPrefix/fileName.json`.
```json

View File

@ -19,6 +19,8 @@
package io.druid.emitter.graphite;
import com.codahale.metrics.graphite.Graphite;
import com.codahale.metrics.graphite.GraphiteSender;
import com.codahale.metrics.graphite.PickledGraphite;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.metamx.emitter.core.Emitter;
@ -136,17 +138,32 @@ public class GraphiteEmitter implements Emitter
private class ConsumerRunnable implements Runnable
{
@Override
public void run()
private final GraphiteSender graphite;
public ConsumerRunnable()
{
try (PickledGraphite pickledGraphite = new PickledGraphite(
if (graphiteEmitterConfig.getProtocol().equals(GraphiteEmitterConfig.PLAINTEXT_PROTOCOL)) {
graphite = new Graphite(
graphiteEmitterConfig.getHostname(),
graphiteEmitterConfig.getPort()
);
} else {
graphite = new PickledGraphite(
graphiteEmitterConfig.getHostname(),
graphiteEmitterConfig.getPort(),
graphiteEmitterConfig.getBatchSize()
)) {
if (!pickledGraphite.isConnected()) {
);
}
log.info("Using %s protocol.", graphiteEmitterConfig.getProtocol());
}
@Override
public void run()
{
try {
if (!graphite.isConnected()) {
log.info("trying to connect to graphite server");
pickledGraphite.connect();
graphite.connect();
}
while (eventsQueue.size() > 0 && !exec.isShutdown()) {
try {
@ -161,7 +178,7 @@ public class GraphiteEmitter implements Emitter
graphiteEvent.getValue(),
graphiteEvent.getTimestamp()
);
pickledGraphite.send(
graphite.send(
graphiteEvent.getEventPath(),
graphiteEvent.getValue(),
graphiteEvent.getTimestamp()
@ -176,9 +193,9 @@ public class GraphiteEmitter implements Emitter
} else if (e instanceof SocketException) {
// This is antagonistic to general Closeable contract in Java,
// it is needed to allow re-connection in case of the socket is closed due long period of inactivity
pickledGraphite.close();
graphite.close();
log.warn("Trying to re-connect to graphite server");
pickledGraphite.connect();
graphite.connect();
}
}
}
@ -219,8 +236,17 @@ public class GraphiteEmitter implements Emitter
}
protected static String sanitize(String namespace)
{
return sanitize(namespace, false);
}
protected static String sanitize(String namespace, Boolean replaceSlashToDot)
{
Pattern DOT_OR_WHITESPACE = Pattern.compile("[\\s]+|[.]+");
return DOT_OR_WHITESPACE.matcher(namespace).replaceAll("_");
String sanitizedNamespace = DOT_OR_WHITESPACE.matcher(namespace).replaceAll("_");
if (replaceSlashToDot) {
sanitizedNamespace = sanitizedNamespace.replace("/", ".");
}
return sanitizedNamespace;
}
}

View File

@ -29,6 +29,8 @@ import java.util.List;
public class GraphiteEmitterConfig
{
public final static String PLAINTEXT_PROTOCOL = "plaintext";
public final static String PICKLE_PROTOCOL = "pickle";
private final static int DEFAULT_BATCH_SIZE = 100;
private static final Long DEFAULT_FLUSH_PERIOD = (long) (60 * 1000); // flush every one minute
private final static long DEFAULT_GET_TIMEOUT = 1000; // default wait for get operations on the queue 1 sec
@ -40,18 +42,17 @@ public class GraphiteEmitterConfig
@JsonProperty
final private int batchSize;
@JsonProperty
final private String protocol;
@JsonProperty
final private Long flushPeriod;
@JsonProperty
final private Integer maxQueueSize;
@JsonProperty("eventConverter")
final private DruidToGraphiteEventConverter druidToGraphiteEventConverter;
@JsonProperty
final private List<String> alertEmitters;
@JsonProperty
final private Long emitWaitTime;
//waiting up to the specified wait time if necessary for an event to become available.
@JsonProperty
final private Long waitForEventTime;
@ -74,6 +75,9 @@ public class GraphiteEmitterConfig
if (getBatchSize() != that.getBatchSize()) {
return false;
}
if (!getProtocol().equals(that.getProtocol())) {
return false;
}
if (!getHostname().equals(that.getHostname())) {
return false;
}
@ -104,6 +108,7 @@ public class GraphiteEmitterConfig
int result = getHostname().hashCode();
result = 31 * result + getPort();
result = 31 * result + getBatchSize();
result = 31 * result + getProtocol().hashCode();
result = 31 * result + getFlushPeriod().hashCode();
result = 31 * result + getMaxQueueSize().hashCode();
result = 31 * result + getDruidToGraphiteEventConverter().hashCode();
@ -118,6 +123,7 @@ public class GraphiteEmitterConfig
@JsonProperty("hostname") String hostname,
@JsonProperty("port") Integer port,
@JsonProperty("batchSize") Integer batchSize,
@JsonProperty("protocol") String protocol,
@JsonProperty("flushPeriod") Long flushPeriod,
@JsonProperty("maxQueueSize") Integer maxQueueSize,
@JsonProperty("eventConverter") DruidToGraphiteEventConverter druidToGraphiteEventConverter,
@ -138,6 +144,7 @@ public class GraphiteEmitterConfig
this.hostname = Preconditions.checkNotNull(hostname, "hostname can not be null");
this.port = Preconditions.checkNotNull(port, "port can not be null");
this.batchSize = (batchSize == null) ? DEFAULT_BATCH_SIZE : batchSize;
this.protocol = (protocol == null) ? PICKLE_PROTOCOL : protocol;
}
@JsonProperty
@ -158,6 +165,12 @@ public class GraphiteEmitterConfig
return batchSize;
}
@JsonProperty
public String getProtocol()
{
return protocol;
}
@JsonProperty
public Integer getMaxQueueSize()
{

View File

@ -53,6 +53,9 @@ public class SendAllGraphiteEventConverter implements DruidToGraphiteEventConver
@JsonProperty
private final String namespacePrefix;
@JsonProperty
private final boolean replaceSlashWithDot;
@JsonProperty
public String getNamespacePrefix()
{
@ -71,15 +74,23 @@ public class SendAllGraphiteEventConverter implements DruidToGraphiteEventConver
return ignoreHostname;
}
@JsonProperty
public boolean replaceSlashWithDot()
{
return replaceSlashWithDot;
}
@JsonCreator
public SendAllGraphiteEventConverter(
@JsonProperty("namespacePrefix") String namespacePrefix,
@JsonProperty("ignoreHostname") Boolean ignoreHostname,
@JsonProperty("ignoreServiceName") Boolean ignoreServiceName
@JsonProperty("ignoreServiceName") Boolean ignoreServiceName,
@JsonProperty("replaceSlashWithDot") Boolean replaceSlashWithDot
)
{
this.ignoreHostname = ignoreHostname == null ? false : ignoreHostname;
this.ignoreServiceName = ignoreServiceName == null ? false : ignoreServiceName;
this.replaceSlashWithDot = replaceSlashWithDot == null ? false : replaceSlashWithDot;
this.namespacePrefix = Preconditions.checkNotNull(namespacePrefix, "namespace prefix can not be null");
}
@ -100,7 +111,7 @@ public class SendAllGraphiteEventConverter implements DruidToGraphiteEventConver
metricPathBuilder.add(GraphiteEmitter.sanitize(String.valueOf(serviceMetricEvent.getUserDims()
.get(dimName))));
}
metricPathBuilder.add(GraphiteEmitter.sanitize(serviceMetricEvent.getMetric()));
metricPathBuilder.add(GraphiteEmitter.sanitize(serviceMetricEvent.getMetric(), this.replaceSlashWithDot()));
return new GraphiteEvent(
Joiner.on(".").join(metricPathBuilder.build()),
@ -127,6 +138,9 @@ public class SendAllGraphiteEventConverter implements DruidToGraphiteEventConver
if (isIgnoreServiceName() != that.isIgnoreServiceName()) {
return false;
}
if (replaceSlashWithDot() != that.replaceSlashWithDot()) {
return false;
}
return getNamespacePrefix().equals(that.getNamespacePrefix());
}
@ -136,6 +150,7 @@ public class SendAllGraphiteEventConverter implements DruidToGraphiteEventConver
{
int result = (isIgnoreHostname() ? 1 : 0);
result = 31 * result + (isIgnoreServiceName() ? 1 : 0);
result = 31 * result + (replaceSlashWithDot() ? 1 : 0);
result = 31 * result + getNamespacePrefix().hashCode();
return result;
}

View File

@ -69,6 +69,9 @@ public class WhiteListBasedConverter implements DruidToGraphiteEventConverter
@JsonProperty
private final String namespacePrefix;
@JsonProperty
private final boolean replaceSlashWithDot;
@JsonProperty
private final String mapPath;
@ -79,6 +82,7 @@ public class WhiteListBasedConverter implements DruidToGraphiteEventConverter
@JsonProperty("namespacePrefix") String namespacePrefix,
@JsonProperty("ignoreHostname") Boolean ignoreHostname,
@JsonProperty("ignoreServiceName") Boolean ignoreServiceName,
@JsonProperty("replaceSlashWithDot") Boolean replaceSlashWithDot,
@JsonProperty("mapPath") String mapPath,
@JacksonInject ObjectMapper mapper
)
@ -88,6 +92,7 @@ public class WhiteListBasedConverter implements DruidToGraphiteEventConverter
this.whiteListDimsMapper = readMap(this.mapPath);
this.ignoreHostname = ignoreHostname == null ? false : ignoreHostname;
this.ignoreServiceName = ignoreServiceName == null ? false : ignoreServiceName;
this.replaceSlashWithDot = replaceSlashWithDot == null ? false : replaceSlashWithDot;
this.namespacePrefix = Preconditions.checkNotNull(namespacePrefix, "namespace prefix can not be null");
}
@ -109,6 +114,12 @@ public class WhiteListBasedConverter implements DruidToGraphiteEventConverter
return namespacePrefix;
}
@JsonProperty
public boolean replaceSlashWithDot()
{
return replaceSlashWithDot;
}
public ImmutableSortedMap<String, ImmutableSet<String>> getWhiteListDimsMapper()
{
return whiteListDimsMapper;
@ -200,7 +211,7 @@ public class WhiteListBasedConverter implements DruidToGraphiteEventConverter
metricPathBuilder.add(GraphiteEmitter.sanitize(serviceMetricEvent.getHost()));
}
metricPathBuilder.addAll(this.getOrderedDimValues(serviceMetricEvent));
metricPathBuilder.add(GraphiteEmitter.sanitize(serviceMetricEvent.getMetric()));
metricPathBuilder.add(GraphiteEmitter.sanitize(serviceMetricEvent.getMetric(), this.replaceSlashWithDot()));
final GraphiteEvent graphiteEvent = new GraphiteEvent(
Joiner.on(".").join(metricPathBuilder.build()),
@ -228,6 +239,9 @@ public class WhiteListBasedConverter implements DruidToGraphiteEventConverter
if (isIgnoreServiceName() != that.isIgnoreServiceName()) {
return false;
}
if (replaceSlashWithDot() != that.replaceSlashWithDot()) {
return false;
}
if (!getNamespacePrefix().equals(that.getNamespacePrefix())) {
return false;
}
@ -240,6 +254,7 @@ public class WhiteListBasedConverter implements DruidToGraphiteEventConverter
{
int result = (isIgnoreHostname() ? 1 : 0);
result = 31 * result + (isIgnoreServiceName() ? 1 : 0);
result = 31 * result + (replaceSlashWithDot() ? 1 : 0);
result = 31 * result + getNamespacePrefix().hashCode();
result = 31 * result + (mapPath != null ? mapPath.hashCode() : 0);
return result;

View File

@ -31,4 +31,11 @@ public class DruidToWhiteListBasedConverterTest
String test = "host name.yahoo.com:8080";
Assert.assertEquals("host_name_yahoo_com:8080", GraphiteEmitter.sanitize(test));
}
@Test
public void testSanitizeAndReplaceSlashWithDot()
{
String test = "query/cache/delta/hitRate";
Assert.assertEquals("query.cache.delta.hitRate", GraphiteEmitter.sanitize(test, true));
}
}

View File

@ -49,9 +49,10 @@ public class GraphiteEmitterConfigTest
"hostname",
8080,
1000,
GraphiteEmitterConfig.PICKLE_PROTOCOL,
1000L,
100,
new SendAllGraphiteEventConverter("prefix", true, true),
new SendAllGraphiteEventConverter("prefix", true, true, false),
Collections.EMPTY_LIST,
null,
null
@ -66,7 +67,12 @@ public class GraphiteEmitterConfigTest
@Test
public void testSerDeserDruidToGraphiteEventConverter() throws IOException
{
SendAllGraphiteEventConverter sendAllGraphiteEventConverter = new SendAllGraphiteEventConverter("prefix", true, true);
SendAllGraphiteEventConverter sendAllGraphiteEventConverter = new SendAllGraphiteEventConverter(
"prefix",
true,
true,
false
);
String noopGraphiteEventConverterString = mapper.writeValueAsString(sendAllGraphiteEventConverter);
DruidToGraphiteEventConverter druidToGraphiteEventConverter = mapper.reader(DruidToGraphiteEventConverter.class)
.readValue(noopGraphiteEventConverterString);
@ -76,6 +82,7 @@ public class GraphiteEmitterConfigTest
"prefix",
true,
true,
false,
"",
new DefaultObjectMapper()
);

View File

@ -41,6 +41,7 @@ public class WhiteListBasedConverterTest
prefix,
false,
false,
false,
null,
new DefaultObjectMapper()
);