Add taskId to realtimeMetrics

Add task Id to Realtime Metrics
This commit is contained in:
Nishant 2015-12-23 18:05:25 +05:30
parent 7e64d5179f
commit 978a3fd8ae
2 changed files with 19 additions and 2 deletions

View File

@ -25,6 +25,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.primitives.Ints; import com.google.common.primitives.Ints;
import com.metamx.common.guava.CloseQuietly; import com.metamx.common.guava.CloseQuietly;
import com.metamx.common.parsers.ParseException; import com.metamx.common.parsers.ParseException;
@ -39,6 +40,7 @@ import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.LockAcquireAction; import io.druid.indexing.common.actions.LockAcquireAction;
import io.druid.indexing.common.actions.LockReleaseAction; import io.druid.indexing.common.actions.LockReleaseAction;
import io.druid.indexing.common.actions.TaskActionClient; import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.query.DruidMetrics;
import io.druid.query.FinalizeResultsQueryRunner; import io.druid.query.FinalizeResultsQueryRunner;
import io.druid.query.Query; import io.druid.query.Query;
import io.druid.query.QueryRunner; import io.druid.query.QueryRunner;
@ -274,7 +276,12 @@ public class RealtimeIndexTask extends AbstractTask
realtimeIOConfig, realtimeIOConfig,
tuningConfig tuningConfig
); );
final RealtimeMetricsMonitor metricsMonitor = new RealtimeMetricsMonitor(ImmutableList.of(fireDepartment)); final RealtimeMetricsMonitor metricsMonitor = new RealtimeMetricsMonitor(
ImmutableList.of(fireDepartment),
ImmutableMap.of(
DruidMetrics.TASK_ID, new String[]{getId()}
)
);
this.queryRunnerFactoryConglomerate = toolbox.getQueryRunnerFactoryConglomerate(); this.queryRunnerFactoryConglomerate = toolbox.getQueryRunnerFactoryConglomerate();
// NOTE: This pusher selects path based purely on global configuration and the DataSegment, which means // NOTE: This pusher selects path based purely on global configuration and the DataSegment, which means
@ -444,7 +451,7 @@ public class RealtimeIndexTask extends AbstractTask
/** /**
* Is a firehose from this factory drainable by closing it? If so, we should drain on stopGracefully rather than * Is a firehose from this factory drainable by closing it? If so, we should drain on stopGracefully rather than
* abruptly stopping. * abruptly stopping.
* <p/> * <p>
* This is a hack to get around the fact that the Firehose and FirehoseFactory interfaces do not help us do this. * This is a hack to get around the fact that the Firehose and FirehoseFactory interfaces do not help us do this.
*/ */
private static boolean isFirehoseDrainableByClosing(FirehoseFactory firehoseFactory) private static boolean isFirehoseDrainableByClosing(FirehoseFactory firehoseFactory)

View File

@ -19,12 +19,14 @@
package io.druid.segment.realtime; package io.druid.segment.realtime;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent; import com.metamx.emitter.service.ServiceMetricEvent;
import com.metamx.metrics.AbstractMonitor; import com.metamx.metrics.AbstractMonitor;
import com.metamx.metrics.MonitorUtils;
import io.druid.query.DruidMetrics; import io.druid.query.DruidMetrics;
import java.util.List; import java.util.List;
@ -38,12 +40,19 @@ public class RealtimeMetricsMonitor extends AbstractMonitor
private final Map<FireDepartment, FireDepartmentMetrics> previousValues; private final Map<FireDepartment, FireDepartmentMetrics> previousValues;
private final List<FireDepartment> fireDepartments; private final List<FireDepartment> fireDepartments;
private final Map<String, String[]> dimensions;
@Inject @Inject
public RealtimeMetricsMonitor(List<FireDepartment> fireDepartments) public RealtimeMetricsMonitor(List<FireDepartment> fireDepartments)
{
this(fireDepartments, ImmutableMap.<String, String[]>of());
}
public RealtimeMetricsMonitor(List<FireDepartment> fireDepartments, Map<String, String[]> dimensions)
{ {
this.fireDepartments = fireDepartments; this.fireDepartments = fireDepartments;
this.previousValues = Maps.newHashMap(); this.previousValues = Maps.newHashMap();
this.dimensions = ImmutableMap.copyOf(dimensions);
} }
@Override @Override
@ -59,6 +68,7 @@ public class RealtimeMetricsMonitor extends AbstractMonitor
final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder() final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.DATASOURCE, fireDepartment.getDataSchema().getDataSource()); .setDimension(DruidMetrics.DATASOURCE, fireDepartment.getDataSchema().getDataSource());
MonitorUtils.addDimensionsToBuilder(builder, dimensions);
final long thrownAway = metrics.thrownAway() - previous.thrownAway(); final long thrownAway = metrics.thrownAway() - previous.thrownAway();
if (thrownAway > 0) { if (thrownAway > 0) {