From 978a3fd8aec19fbc92f29b017d0409944ff196bc Mon Sep 17 00:00:00 2001 From: Nishant Date: Wed, 23 Dec 2015 18:05:25 +0530 Subject: [PATCH] Add taskId to realtimeMetrics Add task Id to Realtime Metrics --- .../druid/indexing/common/task/RealtimeIndexTask.java | 11 +++++++++-- .../segment/realtime/RealtimeMetricsMonitor.java | 10 ++++++++++ 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java index 87f07a42135..9a53a3a6392 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java @@ -25,6 +25,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Supplier; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.primitives.Ints; import com.metamx.common.guava.CloseQuietly; import com.metamx.common.parsers.ParseException; @@ -39,6 +40,7 @@ import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.actions.LockAcquireAction; import io.druid.indexing.common.actions.LockReleaseAction; import io.druid.indexing.common.actions.TaskActionClient; +import io.druid.query.DruidMetrics; import io.druid.query.FinalizeResultsQueryRunner; import io.druid.query.Query; import io.druid.query.QueryRunner; @@ -274,7 +276,12 @@ public class RealtimeIndexTask extends AbstractTask realtimeIOConfig, tuningConfig ); - final RealtimeMetricsMonitor metricsMonitor = new RealtimeMetricsMonitor(ImmutableList.of(fireDepartment)); + final RealtimeMetricsMonitor metricsMonitor = new RealtimeMetricsMonitor( + ImmutableList.of(fireDepartment), + ImmutableMap.of( + DruidMetrics.TASK_ID, new String[]{getId()} + ) + ); this.queryRunnerFactoryConglomerate = toolbox.getQueryRunnerFactoryConglomerate(); // NOTE: This pusher selects path based purely on global configuration and the DataSegment, which means @@ -444,7 +451,7 @@ public class RealtimeIndexTask extends AbstractTask /** * Is a firehose from this factory drainable by closing it? If so, we should drain on stopGracefully rather than * abruptly stopping. - *

+ *

* This is a hack to get around the fact that the Firehose and FirehoseFactory interfaces do not help us do this. */ private static boolean isFirehoseDrainableByClosing(FirehoseFactory firehoseFactory) diff --git a/server/src/main/java/io/druid/segment/realtime/RealtimeMetricsMonitor.java b/server/src/main/java/io/druid/segment/realtime/RealtimeMetricsMonitor.java index 5c635632323..42a22df89f8 100644 --- a/server/src/main/java/io/druid/segment/realtime/RealtimeMetricsMonitor.java +++ b/server/src/main/java/io/druid/segment/realtime/RealtimeMetricsMonitor.java @@ -19,12 +19,14 @@ package io.druid.segment.realtime; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import com.google.inject.Inject; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceMetricEvent; import com.metamx.metrics.AbstractMonitor; +import com.metamx.metrics.MonitorUtils; import io.druid.query.DruidMetrics; import java.util.List; @@ -38,12 +40,19 @@ public class RealtimeMetricsMonitor extends AbstractMonitor private final Map previousValues; private final List fireDepartments; + private final Map dimensions; @Inject public RealtimeMetricsMonitor(List fireDepartments) + { + this(fireDepartments, ImmutableMap.of()); + } + + public RealtimeMetricsMonitor(List fireDepartments, Map dimensions) { this.fireDepartments = fireDepartments; this.previousValues = Maps.newHashMap(); + this.dimensions = ImmutableMap.copyOf(dimensions); } @Override @@ -59,6 +68,7 @@ public class RealtimeMetricsMonitor extends AbstractMonitor final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder() .setDimension(DruidMetrics.DATASOURCE, fireDepartment.getDataSchema().getDataSource()); + MonitorUtils.addDimensionsToBuilder(builder, dimensions); final long thrownAway = metrics.thrownAway() - previous.thrownAway(); if (thrownAway > 0) {