From 58510d826b0728334e17a1fcdb1c0b26c115dc1f Mon Sep 17 00:00:00 2001 From: Slim Date: Tue, 26 Apr 2016 19:07:03 -0500 Subject: [PATCH] fix emit wait time (#2869) --- .../extensions-contrib/graphite.md | 4 +- .../emitter/graphite/GraphiteEmitter.java | 36 ++++++----- .../graphite/GraphiteEmitterConfig.java | 62 +++++++++++++------ .../graphite/GraphiteEmitterModule.java | 2 +- .../graphite/GraphiteEmitterConfigTest.java | 4 +- 5 files changed, 70 insertions(+), 38 deletions(-) diff --git a/docs/content/development/extensions-contrib/graphite.md b/docs/content/development/extensions-contrib/graphite.md index 1598703a2d4..66150e0ad50 100644 --- a/docs/content/development/extensions-contrib/graphite.md +++ b/docs/content/development/extensions-contrib/graphite.md @@ -24,7 +24,9 @@ All the configuration parameters for graphite emitter are under `druid.emitter.g |`druid.emitter.graphite.flushPeriod` | Queue flushing period in milliseconds. |no|1 minute| |`druid.emitter.graphite.maxQueueSize`| Maximum size of the queue used to buffer events. |no|`MAX_INT`| |`druid.emitter.graphite.alertEmitters`| List of emitters where alerts will be forwarded to. |no| empty list (no forwarding)| - +|`druid.emitter.graphite.emitWaitTime` | wait time in milliseconds to try to send the event otherwise emitter will throwing event. |no|0| +|`druid.emitter.graphite.waitForEventTime` | waiting time in milliseconds if necessary for an event to become available. |no|1000 (1 sec)| + ### Druid to Graphite Event Converter Graphite Event Converter defines a mapping between druid metrics name plus dimensions to a Graphite metric path. diff --git a/extensions-contrib/graphite-emitter/src/main/java/io/druid/emitter/graphite/GraphiteEmitter.java b/extensions-contrib/graphite-emitter/src/main/java/io/druid/emitter/graphite/GraphiteEmitter.java index b9c8f177733..d8c877aa046 100644 --- a/extensions-contrib/graphite-emitter/src/main/java/io/druid/emitter/graphite/GraphiteEmitter.java +++ b/extensions-contrib/graphite-emitter/src/main/java/io/druid/emitter/graphite/GraphiteEmitter.java @@ -20,8 +20,6 @@ package io.druid.emitter.graphite; import com.codahale.metrics.graphite.PickledGraphite; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.metamx.common.ISE; import com.metamx.common.logger.Logger; @@ -30,7 +28,6 @@ import com.metamx.emitter.core.Event; import com.metamx.emitter.service.AlertEvent; import com.metamx.emitter.service.ServiceMetricEvent; -import javax.validation.constraints.NotNull; import java.io.IOException; import java.util.List; import java.util.concurrent.ExecutionException; @@ -41,6 +38,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.regex.Pattern; @@ -52,22 +50,20 @@ public class GraphiteEmitter implements Emitter private final GraphiteEmitterConfig graphiteEmitterConfig; private final List emitterList; private final AtomicBoolean started = new AtomicBoolean(false); - private final ObjectMapper mapper; private final LinkedBlockingQueue eventsQueue; - private final static long DEFAULT_PUT_GET_TIMEOUT = 1000; // default wait for put/get operations on the queue 1 sec private static final long FLUSH_TIMEOUT = 60000; // default flush wait 1 min private final ScheduledExecutorService exec = Executors.newScheduledThreadPool(2, new ThreadFactoryBuilder() .setDaemon(true) .setNameFormat("GraphiteEmitter-%s") .build()); // Thread pool of two in order to schedule flush runnable + private AtomicLong countLostEvents = new AtomicLong(0); public GraphiteEmitter( - @NotNull GraphiteEmitterConfig graphiteEmitterConfig, - List emitterList, @NotNull ObjectMapper mapper + GraphiteEmitterConfig graphiteEmitterConfig, + List emitterList ) { this.emitterList = emitterList; - this.mapper = mapper; this.graphiteEmitterConfig = graphiteEmitterConfig; this.graphiteEventConverter = graphiteEmitterConfig.getDruidToGraphiteEventConverter(); this.eventsQueue = new LinkedBlockingQueue(graphiteEmitterConfig.getMaxQueueSize()); @@ -103,12 +99,18 @@ public class GraphiteEmitter implements Emitter return; } try { - final boolean isSuccessful = eventsQueue.offer(graphiteEvent, DEFAULT_PUT_GET_TIMEOUT, TimeUnit.MILLISECONDS); + final boolean isSuccessful = eventsQueue.offer( + graphiteEvent, + graphiteEmitterConfig.getEmitWaitTime(), + TimeUnit.MILLISECONDS + ); if (!isSuccessful) { - log.error( - "Throwing event [%s] on the floor Graphite queue is full please increase the capacity or/and the consumer frequency", - mapper.writeValueAsString(event) - ); + if (countLostEvents.getAndIncrement() % 1000 == 0) { + log.error( + "Lost total of [%s] events because of emitter queue is full. Please increase the capacity or/and the consumer frequency", + countLostEvents.get() + ); + } } } catch (InterruptedException e) { @@ -116,9 +118,6 @@ public class GraphiteEmitter implements Emitter Thread.currentThread().interrupt(); } - catch (JsonProcessingException e) { - log.error(e, e.getMessage()); - } } else if (!emitterList.isEmpty() && event instanceof AlertEvent) { for (Emitter emitter : emitterList) { emitter.emit(event); @@ -147,7 +146,10 @@ public class GraphiteEmitter implements Emitter } while (eventsQueue.size() > 0 && !exec.isShutdown()) { try { - final GraphiteEvent graphiteEvent = eventsQueue.poll(DEFAULT_PUT_GET_TIMEOUT, TimeUnit.MILLISECONDS); + final GraphiteEvent graphiteEvent = eventsQueue.poll( + graphiteEmitterConfig.getWaitForEventTime(), + TimeUnit.MILLISECONDS + ); if (graphiteEvent != null) { log.debug( "sent [%s] with value [%s] and time [%s]", diff --git a/extensions-contrib/graphite-emitter/src/main/java/io/druid/emitter/graphite/GraphiteEmitterConfig.java b/extensions-contrib/graphite-emitter/src/main/java/io/druid/emitter/graphite/GraphiteEmitterConfig.java index db1bcffb551..cd010574feb 100644 --- a/extensions-contrib/graphite-emitter/src/main/java/io/druid/emitter/graphite/GraphiteEmitterConfig.java +++ b/extensions-contrib/graphite-emitter/src/main/java/io/druid/emitter/graphite/GraphiteEmitterConfig.java @@ -31,6 +31,7 @@ public class GraphiteEmitterConfig { private final static int DEFAULT_BATCH_SIZE = 100; private static final Long DEFAULT_FLUSH_PERIOD = (long) (60 * 1000); // flush every one minute + private final static long DEFAULT_GET_TIMEOUT = 1000; // default wait for get operations on the queue 1 sec @JsonProperty final private String hostname; @@ -48,6 +49,13 @@ public class GraphiteEmitterConfig @JsonProperty final private List alertEmitters; + @JsonProperty + final private Long emitWaitTime; + + //waiting up to the specified wait time if necessary for an event to become available. + @JsonProperty + final private Long waitForEventTime; + @Override public boolean equals(Object o) { @@ -66,40 +74,42 @@ public class GraphiteEmitterConfig if (getBatchSize() != that.getBatchSize()) { return false; } - if (getHostname() != null ? !getHostname().equals(that.getHostname()) : that.getHostname() != null) { + if (!getHostname().equals(that.getHostname())) { return false; } - if (getFlushPeriod() != null ? !getFlushPeriod().equals(that.getFlushPeriod()) : that.getFlushPeriod() != null) { + if (!getFlushPeriod().equals(that.getFlushPeriod())) { return false; } - if (getMaxQueueSize() != null - ? !getMaxQueueSize().equals(that.getMaxQueueSize()) - : that.getMaxQueueSize() != null) { + if (!getMaxQueueSize().equals(that.getMaxQueueSize())) { return false; } - if (getDruidToGraphiteEventConverter() != null - ? !getDruidToGraphiteEventConverter().equals(that.getDruidToGraphiteEventConverter()) - : that.getDruidToGraphiteEventConverter() != null) { + if (!getDruidToGraphiteEventConverter().equals(that.getDruidToGraphiteEventConverter())) { return false; } - return !(getAlertEmitters() != null - ? !getAlertEmitters().equals(that.getAlertEmitters()) - : that.getAlertEmitters() != null); + if (getAlertEmitters() != null + ? !getAlertEmitters().equals(that.getAlertEmitters()) + : that.getAlertEmitters() != null) { + return false; + } + if (!getEmitWaitTime().equals(that.getEmitWaitTime())) { + return false; + } + return getWaitForEventTime().equals(that.getWaitForEventTime()); } @Override public int hashCode() { - int result = getHostname() != null ? getHostname().hashCode() : 0; + int result = getHostname().hashCode(); result = 31 * result + getPort(); result = 31 * result + getBatchSize(); - result = 31 * result + (getFlushPeriod() != null ? getFlushPeriod().hashCode() : 0); - result = 31 * result + (getMaxQueueSize() != null ? getMaxQueueSize().hashCode() : 0); - result = 31 * result + (getDruidToGraphiteEventConverter() != null - ? getDruidToGraphiteEventConverter().hashCode() - : 0); + result = 31 * result + getFlushPeriod().hashCode(); + result = 31 * result + getMaxQueueSize().hashCode(); + result = 31 * result + getDruidToGraphiteEventConverter().hashCode(); result = 31 * result + (getAlertEmitters() != null ? getAlertEmitters().hashCode() : 0); + result = 31 * result + getEmitWaitTime().hashCode(); + result = 31 * result + getWaitForEventTime().hashCode(); return result; } @@ -111,9 +121,13 @@ public class GraphiteEmitterConfig @JsonProperty("flushPeriod") Long flushPeriod, @JsonProperty("maxQueueSize") Integer maxQueueSize, @JsonProperty("eventConverter") DruidToGraphiteEventConverter druidToGraphiteEventConverter, - @JsonProperty("alertEmitters") List alertEmitters + @JsonProperty("alertEmitters") List alertEmitters, + @JsonProperty("emitWaitTime") Long emitWaitTime, + @JsonProperty("waitForEventTime") Long waitForEventTime ) { + this.waitForEventTime = waitForEventTime == null ? DEFAULT_GET_TIMEOUT : waitForEventTime; + this.emitWaitTime = emitWaitTime == null ? 0 : emitWaitTime; this.alertEmitters = alertEmitters == null ? Collections.emptyList() : alertEmitters; this.druidToGraphiteEventConverter = Preconditions.checkNotNull( druidToGraphiteEventConverter, @@ -167,4 +181,16 @@ public class GraphiteEmitterConfig { return alertEmitters; } + + @JsonProperty + public Long getEmitWaitTime() + { + return emitWaitTime; + } + + @JsonProperty + public Long getWaitForEventTime() + { + return waitForEventTime; + } } diff --git a/extensions-contrib/graphite-emitter/src/main/java/io/druid/emitter/graphite/GraphiteEmitterModule.java b/extensions-contrib/graphite-emitter/src/main/java/io/druid/emitter/graphite/GraphiteEmitterModule.java index 438008b0d48..658c413baec 100644 --- a/extensions-contrib/graphite-emitter/src/main/java/io/druid/emitter/graphite/GraphiteEmitterModule.java +++ b/extensions-contrib/graphite-emitter/src/main/java/io/druid/emitter/graphite/GraphiteEmitterModule.java @@ -67,6 +67,6 @@ public class GraphiteEmitterModule implements DruidModule } } ); - return new GraphiteEmitter(graphiteEmitterConfig, emitters, mapper); + return new GraphiteEmitter(graphiteEmitterConfig, emitters); } } diff --git a/extensions-contrib/graphite-emitter/src/test/java/io/druid/emitter/graphite/GraphiteEmitterConfigTest.java b/extensions-contrib/graphite-emitter/src/test/java/io/druid/emitter/graphite/GraphiteEmitterConfigTest.java index 8c8287aaafa..c150d9e17c0 100644 --- a/extensions-contrib/graphite-emitter/src/test/java/io/druid/emitter/graphite/GraphiteEmitterConfigTest.java +++ b/extensions-contrib/graphite-emitter/src/test/java/io/druid/emitter/graphite/GraphiteEmitterConfigTest.java @@ -33,7 +33,9 @@ public class GraphiteEmitterConfigTest 1000L, 100, new SendAllGraphiteEventConverter("prefix", true, true), - Collections.EMPTY_LIST + Collections.EMPTY_LIST, + null, + null ); String graphiteEmitterConfigString = mapper.writeValueAsString(graphiteEmitterConfig); GraphiteEmitterConfig graphiteEmitterConfigExpected = mapper.reader(GraphiteEmitterConfig.class).readValue(