mirror of
https://github.com/apache/druid.git
synced 2025-02-16 23:15:16 +00:00
Add metrics for number of segments generated per task in MSQ (#14980)
Add ingest/tombstones/count and ingest/segments/count metrics in MSQ.
This commit is contained in:
parent
75af741a96
commit
7301e60a9c
@ -52,6 +52,7 @@
|
||||
"ingest/handoff/failed" : { "dimensions" : ["dataSource"], "type" : "count" },
|
||||
"ingest/merge/time" : { "dimensions" : ["dataSource"], "type" : "timer" },
|
||||
"ingest/merge/cpu" : { "dimensions" : ["dataSource"], "type" : "timer" },
|
||||
"ingest/segments/count" : { "dimensions" : ["dataSource"], "type" : "count" },
|
||||
|
||||
"ingest/kafka/lag" : { "dimensions" : ["dataSource", "stream"], "type" : "gauge" },
|
||||
"ingest/kafka/maxLag" : { "dimensions" : ["dataSource", "stream"], "type" : "gauge" },
|
||||
|
@ -25,6 +25,7 @@ import org.apache.druid.client.coordinator.CoordinatorClient;
|
||||
import org.apache.druid.indexing.common.TaskReport;
|
||||
import org.apache.druid.indexing.common.actions.TaskActionClient;
|
||||
import org.apache.druid.java.util.common.io.Closer;
|
||||
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
|
||||
import org.apache.druid.server.DruidNode;
|
||||
|
||||
import java.util.Map;
|
||||
@ -36,6 +37,8 @@ import java.util.Map;
|
||||
*/
|
||||
public interface ControllerContext
|
||||
{
|
||||
ServiceEmitter emitter();
|
||||
|
||||
ObjectMapper jsonMapper();
|
||||
|
||||
/**
|
||||
|
@ -1331,6 +1331,7 @@ public class ControllerImpl implements Controller
|
||||
final DataSourceMSQDestination destination =
|
||||
(DataSourceMSQDestination) task.getQuerySpec().getDestination();
|
||||
final Set<DataSegment> segmentsWithTombstones = new HashSet<>(segments);
|
||||
int numTombstones = 0;
|
||||
|
||||
if (destination.isReplaceTimeChunks()) {
|
||||
final List<Interval> intervalsToDrop = findIntervalsToDrop(Preconditions.checkNotNull(segments, "segments"));
|
||||
@ -1345,6 +1346,7 @@ public class ControllerImpl implements Controller
|
||||
destination.getSegmentGranularity()
|
||||
);
|
||||
segmentsWithTombstones.addAll(tombstones);
|
||||
numTombstones = tombstones.size();
|
||||
}
|
||||
catch (IllegalStateException e) {
|
||||
throw new MSQException(e, InsertLockPreemptedFault.instance());
|
||||
@ -1392,6 +1394,10 @@ public class ControllerImpl implements Controller
|
||||
SegmentTransactionalInsertAction.appendAction(segments, null, null)
|
||||
);
|
||||
}
|
||||
|
||||
task.emitMetric(context.emitter(), "ingest/tombstones/count", numTombstones);
|
||||
// Include tombstones in the reported segments count
|
||||
task.emitMetric(context.emitter(), "ingest/segments/count", segmentsWithTombstones.size());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -28,6 +28,7 @@ import org.apache.druid.indexing.common.TaskReport;
|
||||
import org.apache.druid.indexing.common.TaskToolbox;
|
||||
import org.apache.druid.indexing.common.actions.TaskActionClient;
|
||||
import org.apache.druid.java.util.common.io.Closer;
|
||||
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
|
||||
import org.apache.druid.msq.exec.Controller;
|
||||
import org.apache.druid.msq.exec.ControllerContext;
|
||||
import org.apache.druid.msq.exec.WorkerClient;
|
||||
@ -67,6 +68,12 @@ public class IndexerControllerContext implements ControllerContext
|
||||
this.workerManager = new IndexerWorkerManagerClient(overlordClient);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ServiceEmitter emitter()
|
||||
{
|
||||
return toolbox.getEmitter();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ObjectMapper jsonMapper()
|
||||
{
|
||||
|
@ -36,6 +36,7 @@ import org.apache.druid.java.util.common.ISE;
|
||||
import org.apache.druid.java.util.common.concurrent.Execs;
|
||||
import org.apache.druid.java.util.common.io.Closer;
|
||||
import org.apache.druid.java.util.common.logger.Logger;
|
||||
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
|
||||
import org.apache.druid.msq.exec.Controller;
|
||||
import org.apache.druid.msq.exec.ControllerContext;
|
||||
import org.apache.druid.msq.exec.Worker;
|
||||
@ -48,6 +49,7 @@ import org.apache.druid.msq.indexing.MSQWorkerTask;
|
||||
import org.apache.druid.msq.util.MultiStageQueryContext;
|
||||
import org.apache.druid.query.QueryContext;
|
||||
import org.apache.druid.server.DruidNode;
|
||||
import org.apache.druid.server.metrics.NoopServiceEmitter;
|
||||
import org.apache.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker;
|
||||
import org.mockito.ArgumentMatchers;
|
||||
import org.mockito.Mockito;
|
||||
@ -82,6 +84,7 @@ public class MSQTestControllerContext implements ControllerContext
|
||||
);
|
||||
private final Injector injector;
|
||||
private final ObjectMapper mapper;
|
||||
private final ServiceEmitter emitter = new NoopServiceEmitter();
|
||||
|
||||
private Controller controller;
|
||||
private Map<String, TaskReport> report = null;
|
||||
@ -215,6 +218,12 @@ public class MSQTestControllerContext implements ControllerContext
|
||||
}
|
||||
};
|
||||
|
||||
@Override
|
||||
public ServiceEmitter emitter()
|
||||
{
|
||||
return emitter;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ObjectMapper jsonMapper()
|
||||
{
|
||||
|
Loading…
x
Reference in New Issue
Block a user