mirror of https://github.com/apache/druid.git
Merge pull request #2152 from metamx/add-taskId
Add taskId to realtimeMetrics
This commit is contained in:
commit
7ffa706655
|
@ -25,6 +25,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
|
|||
import com.google.common.base.Supplier;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.primitives.Ints;
|
||||
import com.metamx.common.guava.CloseQuietly;
|
||||
import com.metamx.common.parsers.ParseException;
|
||||
|
@ -39,6 +40,7 @@ import io.druid.indexing.common.TaskToolbox;
|
|||
import io.druid.indexing.common.actions.LockAcquireAction;
|
||||
import io.druid.indexing.common.actions.LockReleaseAction;
|
||||
import io.druid.indexing.common.actions.TaskActionClient;
|
||||
import io.druid.query.DruidMetrics;
|
||||
import io.druid.query.FinalizeResultsQueryRunner;
|
||||
import io.druid.query.Query;
|
||||
import io.druid.query.QueryRunner;
|
||||
|
@ -274,7 +276,12 @@ public class RealtimeIndexTask extends AbstractTask
|
|||
realtimeIOConfig,
|
||||
tuningConfig
|
||||
);
|
||||
final RealtimeMetricsMonitor metricsMonitor = new RealtimeMetricsMonitor(ImmutableList.of(fireDepartment));
|
||||
final RealtimeMetricsMonitor metricsMonitor = new RealtimeMetricsMonitor(
|
||||
ImmutableList.of(fireDepartment),
|
||||
ImmutableMap.of(
|
||||
DruidMetrics.TASK_ID, new String[]{getId()}
|
||||
)
|
||||
);
|
||||
this.queryRunnerFactoryConglomerate = toolbox.getQueryRunnerFactoryConglomerate();
|
||||
|
||||
// NOTE: This pusher selects path based purely on global configuration and the DataSegment, which means
|
||||
|
@ -444,7 +451,7 @@ public class RealtimeIndexTask extends AbstractTask
|
|||
/**
|
||||
* Is a firehose from this factory drainable by closing it? If so, we should drain on stopGracefully rather than
|
||||
* abruptly stopping.
|
||||
* <p/>
|
||||
* <p>
|
||||
* This is a hack to get around the fact that the Firehose and FirehoseFactory interfaces do not help us do this.
|
||||
*/
|
||||
private static boolean isFirehoseDrainableByClosing(FirehoseFactory firehoseFactory)
|
||||
|
|
|
@ -19,12 +19,14 @@
|
|||
|
||||
package io.druid.segment.realtime;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.inject.Inject;
|
||||
import com.metamx.emitter.EmittingLogger;
|
||||
import com.metamx.emitter.service.ServiceEmitter;
|
||||
import com.metamx.emitter.service.ServiceMetricEvent;
|
||||
import com.metamx.metrics.AbstractMonitor;
|
||||
import com.metamx.metrics.MonitorUtils;
|
||||
import io.druid.query.DruidMetrics;
|
||||
|
||||
import java.util.List;
|
||||
|
@ -38,12 +40,19 @@ public class RealtimeMetricsMonitor extends AbstractMonitor
|
|||
|
||||
private final Map<FireDepartment, FireDepartmentMetrics> previousValues;
|
||||
private final List<FireDepartment> fireDepartments;
|
||||
private final Map<String, String[]> dimensions;
|
||||
|
||||
@Inject
|
||||
public RealtimeMetricsMonitor(List<FireDepartment> fireDepartments)
|
||||
{
|
||||
this(fireDepartments, ImmutableMap.<String, String[]>of());
|
||||
}
|
||||
|
||||
public RealtimeMetricsMonitor(List<FireDepartment> fireDepartments, Map<String, String[]> dimensions)
|
||||
{
|
||||
this.fireDepartments = fireDepartments;
|
||||
this.previousValues = Maps.newHashMap();
|
||||
this.dimensions = ImmutableMap.copyOf(dimensions);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -59,6 +68,7 @@ public class RealtimeMetricsMonitor extends AbstractMonitor
|
|||
|
||||
final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder()
|
||||
.setDimension(DruidMetrics.DATASOURCE, fireDepartment.getDataSchema().getDataSource());
|
||||
MonitorUtils.addDimensionsToBuilder(builder, dimensions);
|
||||
|
||||
final long thrownAway = metrics.thrownAway() - previous.thrownAway();
|
||||
if (thrownAway > 0) {
|
||||
|
|
Loading…
Reference in New Issue