fix emit wait time (#2869)

This commit is contained in:
Slim 2016-04-26 19:07:03 -05:00 committed by Fangjin Yang
parent 5658bd99eb
commit 58510d826b
5 changed files with 70 additions and 38 deletions

View File

@ -24,6 +24,8 @@ All the configuration parameters for graphite emitter are under `druid.emitter.g
|`druid.emitter.graphite.flushPeriod` | Queue flushing period in milliseconds. |no|1 minute| |`druid.emitter.graphite.flushPeriod` | Queue flushing period in milliseconds. |no|1 minute|
|`druid.emitter.graphite.maxQueueSize`| Maximum size of the queue used to buffer events. |no|`MAX_INT`| |`druid.emitter.graphite.maxQueueSize`| Maximum size of the queue used to buffer events. |no|`MAX_INT`|
|`druid.emitter.graphite.alertEmitters`| List of emitters where alerts will be forwarded to. |no| empty list (no forwarding)| |`druid.emitter.graphite.alertEmitters`| List of emitters where alerts will be forwarded to. |no| empty list (no forwarding)|
|`druid.emitter.graphite.emitWaitTime` | wait time in milliseconds to try to send the event otherwise emitter will throwing event. |no|0|
|`druid.emitter.graphite.waitForEventTime` | waiting time in milliseconds if necessary for an event to become available. |no|1000 (1 sec)|
### Druid to Graphite Event Converter ### Druid to Graphite Event Converter

View File

@ -20,8 +20,6 @@
package io.druid.emitter.graphite; package io.druid.emitter.graphite;
import com.codahale.metrics.graphite.PickledGraphite; import com.codahale.metrics.graphite.PickledGraphite;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
@ -30,7 +28,6 @@ import com.metamx.emitter.core.Event;
import com.metamx.emitter.service.AlertEvent; import com.metamx.emitter.service.AlertEvent;
import com.metamx.emitter.service.ServiceMetricEvent; import com.metamx.emitter.service.ServiceMetricEvent;
import javax.validation.constraints.NotNull;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
@ -41,6 +38,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern; import java.util.regex.Pattern;
@ -52,22 +50,20 @@ public class GraphiteEmitter implements Emitter
private final GraphiteEmitterConfig graphiteEmitterConfig; private final GraphiteEmitterConfig graphiteEmitterConfig;
private final List<Emitter> emitterList; private final List<Emitter> emitterList;
private final AtomicBoolean started = new AtomicBoolean(false); private final AtomicBoolean started = new AtomicBoolean(false);
private final ObjectMapper mapper;
private final LinkedBlockingQueue<GraphiteEvent> eventsQueue; private final LinkedBlockingQueue<GraphiteEvent> eventsQueue;
private final static long DEFAULT_PUT_GET_TIMEOUT = 1000; // default wait for put/get operations on the queue 1 sec
private static final long FLUSH_TIMEOUT = 60000; // default flush wait 1 min private static final long FLUSH_TIMEOUT = 60000; // default flush wait 1 min
private final ScheduledExecutorService exec = Executors.newScheduledThreadPool(2, new ThreadFactoryBuilder() private final ScheduledExecutorService exec = Executors.newScheduledThreadPool(2, new ThreadFactoryBuilder()
.setDaemon(true) .setDaemon(true)
.setNameFormat("GraphiteEmitter-%s") .setNameFormat("GraphiteEmitter-%s")
.build()); // Thread pool of two in order to schedule flush runnable .build()); // Thread pool of two in order to schedule flush runnable
private AtomicLong countLostEvents = new AtomicLong(0);
public GraphiteEmitter( public GraphiteEmitter(
@NotNull GraphiteEmitterConfig graphiteEmitterConfig, GraphiteEmitterConfig graphiteEmitterConfig,
List<Emitter> emitterList, @NotNull ObjectMapper mapper List<Emitter> emitterList
) )
{ {
this.emitterList = emitterList; this.emitterList = emitterList;
this.mapper = mapper;
this.graphiteEmitterConfig = graphiteEmitterConfig; this.graphiteEmitterConfig = graphiteEmitterConfig;
this.graphiteEventConverter = graphiteEmitterConfig.getDruidToGraphiteEventConverter(); this.graphiteEventConverter = graphiteEmitterConfig.getDruidToGraphiteEventConverter();
this.eventsQueue = new LinkedBlockingQueue(graphiteEmitterConfig.getMaxQueueSize()); this.eventsQueue = new LinkedBlockingQueue(graphiteEmitterConfig.getMaxQueueSize());
@ -103,12 +99,18 @@ public class GraphiteEmitter implements Emitter
return; return;
} }
try { try {
final boolean isSuccessful = eventsQueue.offer(graphiteEvent, DEFAULT_PUT_GET_TIMEOUT, TimeUnit.MILLISECONDS); final boolean isSuccessful = eventsQueue.offer(
if (!isSuccessful) { graphiteEvent,
log.error( graphiteEmitterConfig.getEmitWaitTime(),
"Throwing event [%s] on the floor Graphite queue is full please increase the capacity or/and the consumer frequency", TimeUnit.MILLISECONDS
mapper.writeValueAsString(event)
); );
if (!isSuccessful) {
if (countLostEvents.getAndIncrement() % 1000 == 0) {
log.error(
"Lost total of [%s] events because of emitter queue is full. Please increase the capacity or/and the consumer frequency",
countLostEvents.get()
);
}
} }
} }
catch (InterruptedException e) { catch (InterruptedException e) {
@ -116,9 +118,6 @@ public class GraphiteEmitter implements Emitter
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
} }
catch (JsonProcessingException e) {
log.error(e, e.getMessage());
}
} else if (!emitterList.isEmpty() && event instanceof AlertEvent) { } else if (!emitterList.isEmpty() && event instanceof AlertEvent) {
for (Emitter emitter : emitterList) { for (Emitter emitter : emitterList) {
emitter.emit(event); emitter.emit(event);
@ -147,7 +146,10 @@ public class GraphiteEmitter implements Emitter
} }
while (eventsQueue.size() > 0 && !exec.isShutdown()) { while (eventsQueue.size() > 0 && !exec.isShutdown()) {
try { try {
final GraphiteEvent graphiteEvent = eventsQueue.poll(DEFAULT_PUT_GET_TIMEOUT, TimeUnit.MILLISECONDS); final GraphiteEvent graphiteEvent = eventsQueue.poll(
graphiteEmitterConfig.getWaitForEventTime(),
TimeUnit.MILLISECONDS
);
if (graphiteEvent != null) { if (graphiteEvent != null) {
log.debug( log.debug(
"sent [%s] with value [%s] and time [%s]", "sent [%s] with value [%s] and time [%s]",

View File

@ -31,6 +31,7 @@ public class GraphiteEmitterConfig
{ {
private final static int DEFAULT_BATCH_SIZE = 100; private final static int DEFAULT_BATCH_SIZE = 100;
private static final Long DEFAULT_FLUSH_PERIOD = (long) (60 * 1000); // flush every one minute private static final Long DEFAULT_FLUSH_PERIOD = (long) (60 * 1000); // flush every one minute
private final static long DEFAULT_GET_TIMEOUT = 1000; // default wait for get operations on the queue 1 sec
@JsonProperty @JsonProperty
final private String hostname; final private String hostname;
@ -48,6 +49,13 @@ public class GraphiteEmitterConfig
@JsonProperty @JsonProperty
final private List<String> alertEmitters; final private List<String> alertEmitters;
@JsonProperty
final private Long emitWaitTime;
//waiting up to the specified wait time if necessary for an event to become available.
@JsonProperty
final private Long waitForEventTime;
@Override @Override
public boolean equals(Object o) public boolean equals(Object o)
{ {
@ -66,40 +74,42 @@ public class GraphiteEmitterConfig
if (getBatchSize() != that.getBatchSize()) { if (getBatchSize() != that.getBatchSize()) {
return false; return false;
} }
if (getHostname() != null ? !getHostname().equals(that.getHostname()) : that.getHostname() != null) { if (!getHostname().equals(that.getHostname())) {
return false; return false;
} }
if (getFlushPeriod() != null ? !getFlushPeriod().equals(that.getFlushPeriod()) : that.getFlushPeriod() != null) { if (!getFlushPeriod().equals(that.getFlushPeriod())) {
return false; return false;
} }
if (getMaxQueueSize() != null if (!getMaxQueueSize().equals(that.getMaxQueueSize())) {
? !getMaxQueueSize().equals(that.getMaxQueueSize())
: that.getMaxQueueSize() != null) {
return false; return false;
} }
if (getDruidToGraphiteEventConverter() != null if (!getDruidToGraphiteEventConverter().equals(that.getDruidToGraphiteEventConverter())) {
? !getDruidToGraphiteEventConverter().equals(that.getDruidToGraphiteEventConverter())
: that.getDruidToGraphiteEventConverter() != null) {
return false; return false;
} }
return !(getAlertEmitters() != null if (getAlertEmitters() != null
? !getAlertEmitters().equals(that.getAlertEmitters()) ? !getAlertEmitters().equals(that.getAlertEmitters())
: that.getAlertEmitters() != null); : that.getAlertEmitters() != null) {
return false;
}
if (!getEmitWaitTime().equals(that.getEmitWaitTime())) {
return false;
}
return getWaitForEventTime().equals(that.getWaitForEventTime());
} }
@Override @Override
public int hashCode() public int hashCode()
{ {
int result = getHostname() != null ? getHostname().hashCode() : 0; int result = getHostname().hashCode();
result = 31 * result + getPort(); result = 31 * result + getPort();
result = 31 * result + getBatchSize(); result = 31 * result + getBatchSize();
result = 31 * result + (getFlushPeriod() != null ? getFlushPeriod().hashCode() : 0); result = 31 * result + getFlushPeriod().hashCode();
result = 31 * result + (getMaxQueueSize() != null ? getMaxQueueSize().hashCode() : 0); result = 31 * result + getMaxQueueSize().hashCode();
result = 31 * result + (getDruidToGraphiteEventConverter() != null result = 31 * result + getDruidToGraphiteEventConverter().hashCode();
? getDruidToGraphiteEventConverter().hashCode()
: 0);
result = 31 * result + (getAlertEmitters() != null ? getAlertEmitters().hashCode() : 0); result = 31 * result + (getAlertEmitters() != null ? getAlertEmitters().hashCode() : 0);
result = 31 * result + getEmitWaitTime().hashCode();
result = 31 * result + getWaitForEventTime().hashCode();
return result; return result;
} }
@ -111,9 +121,13 @@ public class GraphiteEmitterConfig
@JsonProperty("flushPeriod") Long flushPeriod, @JsonProperty("flushPeriod") Long flushPeriod,
@JsonProperty("maxQueueSize") Integer maxQueueSize, @JsonProperty("maxQueueSize") Integer maxQueueSize,
@JsonProperty("eventConverter") DruidToGraphiteEventConverter druidToGraphiteEventConverter, @JsonProperty("eventConverter") DruidToGraphiteEventConverter druidToGraphiteEventConverter,
@JsonProperty("alertEmitters") List<String> alertEmitters @JsonProperty("alertEmitters") List<String> alertEmitters,
@JsonProperty("emitWaitTime") Long emitWaitTime,
@JsonProperty("waitForEventTime") Long waitForEventTime
) )
{ {
this.waitForEventTime = waitForEventTime == null ? DEFAULT_GET_TIMEOUT : waitForEventTime;
this.emitWaitTime = emitWaitTime == null ? 0 : emitWaitTime;
this.alertEmitters = alertEmitters == null ? Collections.<String>emptyList() : alertEmitters; this.alertEmitters = alertEmitters == null ? Collections.<String>emptyList() : alertEmitters;
this.druidToGraphiteEventConverter = Preconditions.checkNotNull( this.druidToGraphiteEventConverter = Preconditions.checkNotNull(
druidToGraphiteEventConverter, druidToGraphiteEventConverter,
@ -167,4 +181,16 @@ public class GraphiteEmitterConfig
{ {
return alertEmitters; return alertEmitters;
} }
@JsonProperty
public Long getEmitWaitTime()
{
return emitWaitTime;
}
@JsonProperty
public Long getWaitForEventTime()
{
return waitForEventTime;
}
} }

View File

@ -67,6 +67,6 @@ public class GraphiteEmitterModule implements DruidModule
} }
} }
); );
return new GraphiteEmitter(graphiteEmitterConfig, emitters, mapper); return new GraphiteEmitter(graphiteEmitterConfig, emitters);
} }
} }

View File

@ -33,7 +33,9 @@ public class GraphiteEmitterConfigTest
1000L, 1000L,
100, 100,
new SendAllGraphiteEventConverter("prefix", true, true), new SendAllGraphiteEventConverter("prefix", true, true),
Collections.EMPTY_LIST Collections.EMPTY_LIST,
null,
null
); );
String graphiteEmitterConfigString = mapper.writeValueAsString(graphiteEmitterConfig); String graphiteEmitterConfigString = mapper.writeValueAsString(graphiteEmitterConfig);
GraphiteEmitterConfig graphiteEmitterConfigExpected = mapper.reader(GraphiteEmitterConfig.class).readValue( GraphiteEmitterConfig graphiteEmitterConfigExpected = mapper.reader(GraphiteEmitterConfig.class).readValue(