Add RequestLogEvent emitters config to graphite-emitter (#4678)

* Add RequestLogEvent emitters config to graphite-emitter

* eagerly compute emitter list

* use lambdas

* checkstyle
This commit is contained in:
Jonathan Wei 2017-09-22 06:14:32 -07:00 committed by Slim
parent e267f3901b
commit 09fcb75583
7 changed files with 66 additions and 26 deletions

View File

@ -25,10 +25,19 @@ All the configuration parameters for graphite emitter are under `druid.emitter.g
|`druid.emitter.graphite.eventConverter`| Filter and converter of druid events to graphite event (please see next section).|yes|none| |`druid.emitter.graphite.eventConverter`| Filter and converter of druid events to graphite event (please see next section).|yes|none|
|`druid.emitter.graphite.flushPeriod` | Queue flushing period in milliseconds. |no|1 minute| |`druid.emitter.graphite.flushPeriod` | Queue flushing period in milliseconds. |no|1 minute|
|`druid.emitter.graphite.maxQueueSize`| Maximum size of the queue used to buffer events. |no|`MAX_INT`| |`druid.emitter.graphite.maxQueueSize`| Maximum size of the queue used to buffer events. |no|`MAX_INT`|
|`druid.emitter.graphite.alertEmitters`| List of emitters where alerts will be forwarded to. |no| empty list (no forwarding)| |`druid.emitter.graphite.alertEmitters`| List of emitters where alerts will be forwarded to. This is a JSON list of emitter names, e.g. `["logging", "http"]`|no| empty list (no forwarding)|
|`druid.emitter.graphite.requestLogEmitters`| List of emitters where request logs (i.e., query logging events sent to emitters when `druid.request.logging.type` is set to `emitter`) will be forwarded to. This is a JSON list of emitter names, e.g. `["logging", "http"]`|no| empty list (no forwarding)|
|`druid.emitter.graphite.emitWaitTime` | wait time in milliseconds to try to send the event otherwise emitter will throwing event. |no|0| |`druid.emitter.graphite.emitWaitTime` | wait time in milliseconds to try to send the event otherwise emitter will throwing event. |no|0|
|`druid.emitter.graphite.waitForEventTime` | waiting time in milliseconds if necessary for an event to become available. |no|1000 (1 sec)| |`druid.emitter.graphite.waitForEventTime` | waiting time in milliseconds if necessary for an event to become available. |no|1000 (1 sec)|
### Supported event types
The graphite emitter only emits service metric events to graphite (See http://druid.io/docs/latest/operations/metrics.html for a list of metrics).
Alerts and request logs are not sent to graphite. These event types are not well represented in Graphite, which is more suited for timeseries views on numeric metrics, vs. storing non-numeric log events.
Instead, alerts and request logs are optionally forwarded to other emitter implementations, specified by `druid.emitter.graphite.alertEmitters` and `druid.emitter.graphite.requestLogEmitters` respectively.
### Druid to Graphite Event Converter ### Druid to Graphite Event Converter
Graphite Event Converter defines a mapping between druid metrics name plus dimensions to a Graphite metric path. Graphite Event Converter defines a mapping between druid metrics name plus dimensions to a Graphite metric path.

View File

@ -47,6 +47,12 @@
<version>${project.parent.version}</version> <version>${project.parent.version}</version>
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-server</artifactId>
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<dependency> <dependency>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>emitter</artifactId> <artifactId>emitter</artifactId>
@ -73,13 +79,6 @@
<version>1.0.4</version> <version>1.0.4</version>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-server</artifactId>
<version>${project.parent.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency> <dependency>
<groupId>io.druid</groupId> <groupId>io.druid</groupId>
<artifactId>druid-processing</artifactId> <artifactId>druid-processing</artifactId>

View File

@ -30,6 +30,7 @@ import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.java.util.common.ISE; import io.druid.java.util.common.ISE;
import io.druid.java.util.common.logger.Logger; import io.druid.java.util.common.logger.Logger;
import io.druid.server.log.EmittingRequestLogger;
import java.io.IOException; import java.io.IOException;
import java.net.SocketException; import java.net.SocketException;
@ -52,7 +53,8 @@ public class GraphiteEmitter implements Emitter
private final DruidToGraphiteEventConverter graphiteEventConverter; private final DruidToGraphiteEventConverter graphiteEventConverter;
private final GraphiteEmitterConfig graphiteEmitterConfig; private final GraphiteEmitterConfig graphiteEmitterConfig;
private final List<Emitter> emitterList; private final List<Emitter> alertEmitters;
private final List<Emitter> requestLogEmitters;
private final AtomicBoolean started = new AtomicBoolean(false); private final AtomicBoolean started = new AtomicBoolean(false);
private final LinkedBlockingQueue<GraphiteEvent> eventsQueue; private final LinkedBlockingQueue<GraphiteEvent> eventsQueue;
private static final long FLUSH_TIMEOUT = 60000; // default flush wait 1 min private static final long FLUSH_TIMEOUT = 60000; // default flush wait 1 min
@ -64,10 +66,12 @@ public class GraphiteEmitter implements Emitter
public GraphiteEmitter( public GraphiteEmitter(
GraphiteEmitterConfig graphiteEmitterConfig, GraphiteEmitterConfig graphiteEmitterConfig,
List<Emitter> emitterList List<Emitter> alertEmitters,
List<Emitter> requestLogEmitters
) )
{ {
this.emitterList = emitterList; this.alertEmitters = alertEmitters;
this.requestLogEmitters = requestLogEmitters;
this.graphiteEmitterConfig = graphiteEmitterConfig; this.graphiteEmitterConfig = graphiteEmitterConfig;
this.graphiteEventConverter = graphiteEmitterConfig.getDruidToGraphiteEventConverter(); this.graphiteEventConverter = graphiteEmitterConfig.getDruidToGraphiteEventConverter();
this.eventsQueue = new LinkedBlockingQueue(graphiteEmitterConfig.getMaxQueueSize()); this.eventsQueue = new LinkedBlockingQueue(graphiteEmitterConfig.getMaxQueueSize());
@ -121,8 +125,12 @@ public class GraphiteEmitter implements Emitter
log.error(e, "got interrupted with message [%s]", e.getMessage()); log.error(e, "got interrupted with message [%s]", e.getMessage());
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
} }
} else if (!emitterList.isEmpty() && event instanceof AlertEvent) { } else if (event instanceof EmittingRequestLogger.RequestLogEvent) {
for (Emitter emitter : emitterList) { for (Emitter emitter : requestLogEmitters) {
emitter.emit(event);
}
} else if (!alertEmitters.isEmpty() && event instanceof AlertEvent) {
for (Emitter emitter : alertEmitters) {
emitter.emit(event); emitter.emit(event);
} }
} else if (event instanceof AlertEvent) { } else if (event instanceof AlertEvent) {

View File

@ -51,6 +51,9 @@ public class GraphiteEmitterConfig
final private DruidToGraphiteEventConverter druidToGraphiteEventConverter; final private DruidToGraphiteEventConverter druidToGraphiteEventConverter;
@JsonProperty @JsonProperty
final private List<String> alertEmitters; final private List<String> alertEmitters;
@JsonProperty
final private List<String> requestLogEmitters;
@JsonProperty @JsonProperty
final private Long emitWaitTime; final private Long emitWaitTime;
//waiting up to the specified wait time if necessary for an event to become available. //waiting up to the specified wait time if necessary for an event to become available.
@ -95,6 +98,11 @@ public class GraphiteEmitterConfig
: that.getAlertEmitters() != null) { : that.getAlertEmitters() != null) {
return false; return false;
} }
if (getRequestLogEmitters() != null
? !getRequestLogEmitters().equals(that.getRequestLogEmitters())
: that.getRequestLogEmitters() != null) {
return false;
}
if (!getEmitWaitTime().equals(that.getEmitWaitTime())) { if (!getEmitWaitTime().equals(that.getEmitWaitTime())) {
return false; return false;
} }
@ -113,6 +121,7 @@ public class GraphiteEmitterConfig
result = 31 * result + getMaxQueueSize().hashCode(); result = 31 * result + getMaxQueueSize().hashCode();
result = 31 * result + getDruidToGraphiteEventConverter().hashCode(); result = 31 * result + getDruidToGraphiteEventConverter().hashCode();
result = 31 * result + (getAlertEmitters() != null ? getAlertEmitters().hashCode() : 0); result = 31 * result + (getAlertEmitters() != null ? getAlertEmitters().hashCode() : 0);
result = 31 * result + (getRequestLogEmitters() != null ? getRequestLogEmitters().hashCode() : 0);
result = 31 * result + getEmitWaitTime().hashCode(); result = 31 * result + getEmitWaitTime().hashCode();
result = 31 * result + getWaitForEventTime().hashCode(); result = 31 * result + getWaitForEventTime().hashCode();
return result; return result;
@ -128,6 +137,7 @@ public class GraphiteEmitterConfig
@JsonProperty("maxQueueSize") Integer maxQueueSize, @JsonProperty("maxQueueSize") Integer maxQueueSize,
@JsonProperty("eventConverter") DruidToGraphiteEventConverter druidToGraphiteEventConverter, @JsonProperty("eventConverter") DruidToGraphiteEventConverter druidToGraphiteEventConverter,
@JsonProperty("alertEmitters") List<String> alertEmitters, @JsonProperty("alertEmitters") List<String> alertEmitters,
@JsonProperty("requestLogEmitters") List<String> requestLogEmitters,
@JsonProperty("emitWaitTime") Long emitWaitTime, @JsonProperty("emitWaitTime") Long emitWaitTime,
@JsonProperty("waitForEventTime") Long waitForEventTime @JsonProperty("waitForEventTime") Long waitForEventTime
) )
@ -135,6 +145,7 @@ public class GraphiteEmitterConfig
this.waitForEventTime = waitForEventTime == null ? DEFAULT_GET_TIMEOUT : waitForEventTime; this.waitForEventTime = waitForEventTime == null ? DEFAULT_GET_TIMEOUT : waitForEventTime;
this.emitWaitTime = emitWaitTime == null ? 0 : emitWaitTime; this.emitWaitTime = emitWaitTime == null ? 0 : emitWaitTime;
this.alertEmitters = alertEmitters == null ? Collections.<String>emptyList() : alertEmitters; this.alertEmitters = alertEmitters == null ? Collections.<String>emptyList() : alertEmitters;
this.requestLogEmitters = requestLogEmitters == null ? Collections.<String>emptyList() : requestLogEmitters;
this.druidToGraphiteEventConverter = Preconditions.checkNotNull( this.druidToGraphiteEventConverter = Preconditions.checkNotNull(
druidToGraphiteEventConverter, druidToGraphiteEventConverter,
"Event converter can not ne null dude" "Event converter can not ne null dude"
@ -195,6 +206,12 @@ public class GraphiteEmitterConfig
return alertEmitters; return alertEmitters;
} }
@JsonProperty
public List<String> getRequestLogEmitters()
{
return requestLogEmitters;
}
@JsonProperty @JsonProperty
public Long getEmitWaitTime() public Long getEmitWaitTime()
{ {

View File

@ -21,7 +21,7 @@ package io.druid.emitter.graphite;
import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function; import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.inject.Binder; import com.google.inject.Binder;
import com.google.inject.Injector; import com.google.inject.Injector;
@ -57,17 +57,23 @@ public class GraphiteEmitterModule implements DruidModule
@Named(EMITTER_TYPE) @Named(EMITTER_TYPE)
public Emitter getEmitter(GraphiteEmitterConfig graphiteEmitterConfig, ObjectMapper mapper, final Injector injector) public Emitter getEmitter(GraphiteEmitterConfig graphiteEmitterConfig, ObjectMapper mapper, final Injector injector)
{ {
List<Emitter> emitters = Lists.transform( List<Emitter> emitters = ImmutableList.copyOf(
graphiteEmitterConfig.getAlertEmitters(), Lists.transform(
new Function<String, Emitter>() graphiteEmitterConfig.getAlertEmitters(),
{ alertEmitterName -> {
@Override return injector.getInstance(Key.get(Emitter.class, Names.named(alertEmitterName)));
public Emitter apply(String s) }
{ )
return injector.getInstance(Key.get(Emitter.class, Names.named(s)));
}
}
); );
return new GraphiteEmitter(graphiteEmitterConfig, emitters);
List<Emitter> requestLogEmitters = ImmutableList.copyOf(
Lists.transform(
graphiteEmitterConfig.getRequestLogEmitters(),
requestLogEmitterName -> {
return injector.getInstance(Key.get(Emitter.class, Names.named(requestLogEmitterName)));
}
)
);
return new GraphiteEmitter(graphiteEmitterConfig, emitters, requestLogEmitters);
} }
} }

View File

@ -54,6 +54,7 @@ public class GraphiteEmitterConfigTest
100, 100,
new SendAllGraphiteEventConverter("prefix", true, true, false), new SendAllGraphiteEventConverter("prefix", true, true, false),
Collections.EMPTY_LIST, Collections.EMPTY_LIST,
Collections.EMPTY_LIST,
null, null,
null null
); );

View File

@ -59,7 +59,7 @@ public class EmittingRequestLogger implements RequestLogger
'}'; '}';
} }
private static class RequestLogEvent implements Event public static class RequestLogEvent implements Event
{ {
final ImmutableMap<String, String> serviceDimensions; final ImmutableMap<String, String> serviceDimensions;
final String feed; final String feed;