Merge branch 'quidem-record' into quidem-msq

This commit is contained in:
Zoltan Haindrich 2024-08-05 13:00:59 +00:00
commit 841ab462dd
16 changed files with 507 additions and 44 deletions

View File

@ -77,6 +77,8 @@ Most metric values reset each emission period, as specified in `druid.monitoring
|`metadatacache/schemaPoll/time`|Time taken for coordinator polls to fetch datasource schema.||| |`metadatacache/schemaPoll/time`|Time taken for coordinator polls to fetch datasource schema.|||
|`serverview/sync/healthy`|Sync status of the Broker with a segment-loading server such as a Historical or Peon. Emitted only when [HTTP-based server view](../configuration/index.md#segment-management) is enabled. This metric can be used in conjunction with `serverview/sync/unstableTime` to debug slow startup of Brokers.|`server`, `tier`|1 for fully synced servers, 0 otherwise| |`serverview/sync/healthy`|Sync status of the Broker with a segment-loading server such as a Historical or Peon. Emitted only when [HTTP-based server view](../configuration/index.md#segment-management) is enabled. This metric can be used in conjunction with `serverview/sync/unstableTime` to debug slow startup of Brokers.|`server`, `tier`|1 for fully synced servers, 0 otherwise|
|`serverview/sync/unstableTime`|Time in milliseconds for which the Broker has been failing to sync with a segment-loading server. Emitted only when [HTTP-based server view](../configuration/index.md#segment-management) is enabled.|`server`, `tier`|Not emitted for synced servers.| |`serverview/sync/unstableTime`|Time in milliseconds for which the Broker has been failing to sync with a segment-loading server. Emitted only when [HTTP-based server view](../configuration/index.md#segment-management) is enabled.|`server`, `tier`|Not emitted for synced servers.|
|`subquery/rows`|Number of rows materialized by the subquery's results. |`id`, `subqueryId`| Varies |
|`subquery/bytes`|Number of bytes materialized by the subquery's results. This metric is only emitted if the query uses [byte-based subquery guardrails](https://druid.apache.org/docs/latest/configuration/#guardrails-for-materialization-of-subqueries) |`id`, `subqueryId` | Varies |
|`subquery/rowLimit/count`|Number of subqueries whose results are materialized as rows (Java objects on heap).|This metric is only available if the `SubqueryCountStatsMonitor` module is included.| | |`subquery/rowLimit/count`|Number of subqueries whose results are materialized as rows (Java objects on heap).|This metric is only available if the `SubqueryCountStatsMonitor` module is included.| |
|`subquery/byteLimit/count`|Number of subqueries whose results are materialized as frames (Druid's internal byte representation of rows).|This metric is only available if the `SubqueryCountStatsMonitor` module is included.| | |`subquery/byteLimit/count`|Number of subqueries whose results are materialized as frames (Druid's internal byte representation of rows).|This metric is only available if the `SubqueryCountStatsMonitor` module is included.| |
|`subquery/fallback/count`|Number of subqueries which cannot be materialized as frames|This metric is only available if the `SubqueryCountStatsMonitor` module is included.| | |`subquery/fallback/count`|Number of subqueries which cannot be materialized as frames|This metric is only available if the `SubqueryCountStatsMonitor` module is included.| |
@ -297,6 +299,8 @@ If the JVM does not support CPU time measurement for the current thread, `ingest
|`worker/taskSlot/used/count`|Number of busy task slots on the reporting worker per emission period. This metric is only available if the `WorkerTaskCountStatsMonitor` module is included.| `category`, `workerVersion`|Varies| |`worker/taskSlot/used/count`|Number of busy task slots on the reporting worker per emission period. This metric is only available if the `WorkerTaskCountStatsMonitor` module is included.| `category`, `workerVersion`|Varies|
|`worker/task/assigned/count`|Number of tasks assigned to an indexer per emission period. This metric is only available if the `WorkerTaskCountStatsMonitor` module is included.|`dataSource`|Varies| |`worker/task/assigned/count`|Number of tasks assigned to an indexer per emission period. This metric is only available if the `WorkerTaskCountStatsMonitor` module is included.|`dataSource`|Varies|
|`worker/task/completed/count`|Number of tasks completed by an indexer per emission period. This metric is only available if the `WorkerTaskCountStatsMonitor` module is included.|`dataSource`|Varies| |`worker/task/completed/count`|Number of tasks completed by an indexer per emission period. This metric is only available if the `WorkerTaskCountStatsMonitor` module is included.|`dataSource`|Varies|
|`worker/task/failed/count`|Number of tasks that failed on an indexer during the emission period. This metric is only available if the `WorkerTaskCountStatsMonitor` module is included.|`dataSource`|Varies|
|`worker/task/success/count`|Number of tasks that succeeded on an indexer during the emission period. This metric is only available if the `WorkerTaskCountStatsMonitor` module is included.|`dataSource`|Varies|
|`worker/task/running/count`|Number of tasks running on an indexer per emission period. This metric is only available if the `WorkerTaskCountStatsMonitor` module is included.|`dataSource`|Varies| |`worker/task/running/count`|Number of tasks running on an indexer per emission period. This metric is only available if the `WorkerTaskCountStatsMonitor` module is included.|`dataSource`|Varies|
## Shuffle metrics (Native parallel task) ## Shuffle metrics (Native parallel task)

View File

@ -74,8 +74,8 @@
"worker/task/assigned/count" : { "dimensions" : ["dataSource"], "type" : "count" }, "worker/task/assigned/count" : { "dimensions" : ["dataSource"], "type" : "count" },
"worker/task/running/count" : { "dimensions" : ["dataSource"], "type" : "count" }, "worker/task/running/count" : { "dimensions" : ["dataSource"], "type" : "count" },
"worker/task/completed/count" : { "dimensions" : ["dataSource"], "type" : "count" }, "worker/task/completed/count" : { "dimensions" : ["dataSource"], "type" : "count" },
"worker/task/failed/count" : { "dimensions" : ["category", "workerVersion"], "type" : "count" }, "worker/task/failed/count" : { "dimensions" : ["category", "workerVersion", "dataSource"], "type" : "count" },
"worker/task/success/count" : { "dimensions" : ["category", "workerVersion"], "type" : "count" }, "worker/task/success/count" : { "dimensions" : ["category", "workerVersion", "dataSource"], "type" : "count" },
"worker/taskSlot/idle/count" : { "dimensions" : ["category", "workerVersion"], "type" : "gauge" }, "worker/taskSlot/idle/count" : { "dimensions" : ["category", "workerVersion"], "type" : "gauge" },
"worker/taskSlot/total/count" : { "dimensions" : ["category", "workerVersion"], "type" : "gauge" }, "worker/taskSlot/total/count" : { "dimensions" : ["category", "workerVersion"], "type" : "gauge" },
"worker/taskSlot/used/count" : { "dimensions" : ["category", "workerVersion"], "type" : "gauge" }, "worker/taskSlot/used/count" : { "dimensions" : ["category", "workerVersion"], "type" : "gauge" },

View File

@ -113,7 +113,6 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.function.Supplier;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -588,19 +587,19 @@ public class SqlStatementResource
MSQControllerTask msqControllerTask = getMSQControllerTaskAndCheckPermission(queryId, authenticationResult, forAction); MSQControllerTask msqControllerTask = getMSQControllerTaskAndCheckPermission(queryId, authenticationResult, forAction);
SqlStatementState sqlStatementState = SqlStatementResourceHelper.getSqlStatementState(statusPlus); SqlStatementState sqlStatementState = SqlStatementResourceHelper.getSqlStatementState(statusPlus);
Supplier<Optional<MSQTaskReportPayload>> msqTaskReportPayloadSupplier = () -> { MSQTaskReportPayload taskReportPayload = null;
if (detail || SqlStatementState.FAILED == sqlStatementState) {
try { try {
return Optional.ofNullable(SqlStatementResourceHelper.getPayload( taskReportPayload = SqlStatementResourceHelper.getPayload(
contactOverlord(overlordClient.taskReportAsMap(queryId), queryId) contactOverlord(overlordClient.taskReportAsMap(queryId), queryId)
)); );
} }
catch (DruidException e) { catch (DruidException e) {
if (e.getErrorCode().equals("notFound") || e.getMessage().contains("Unable to contact overlord")) { if (!e.getErrorCode().equals("notFound") && !e.getMessage().contains("Unable to contact overlord")) {
return Optional.empty();
}
throw e; throw e;
} }
}; }
}
if (SqlStatementState.FAILED == sqlStatementState) { if (SqlStatementState.FAILED == sqlStatementState) {
return SqlStatementResourceHelper.getExceptionPayload( return SqlStatementResourceHelper.getExceptionPayload(
@ -608,7 +607,7 @@ public class SqlStatementResource
taskResponse, taskResponse,
statusPlus, statusPlus,
sqlStatementState, sqlStatementState,
msqTaskReportPayloadSupplier.get().orElse(null), taskReportPayload,
jsonMapper, jsonMapper,
detail detail
); );
@ -627,9 +626,9 @@ public class SqlStatementResource
msqControllerTask.getQuerySpec().getDestination() msqControllerTask.getQuerySpec().getDestination()
).orElse(null) : null, ).orElse(null) : null,
null, null,
detail ? SqlStatementResourceHelper.getQueryStagesReport(msqTaskReportPayloadSupplier.get().orElse(null)) : null, SqlStatementResourceHelper.getQueryStagesReport(taskReportPayload),
detail ? SqlStatementResourceHelper.getQueryCounters(msqTaskReportPayloadSupplier.get().orElse(null)) : null, SqlStatementResourceHelper.getQueryCounters(taskReportPayload),
detail ? SqlStatementResourceHelper.getQueryWarningDetails(msqTaskReportPayloadSupplier.get().orElse(null)) : null SqlStatementResourceHelper.getQueryWarningDetails(taskReportPayload)
)); ));
} }
} }

View File

@ -22,11 +22,13 @@ package org.apache.druid.msq.exec;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.impl.JsonInputFormat; import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.data.input.impl.LocalInputSource; import org.apache.druid.data.input.impl.LocalInputSource;
import org.apache.druid.data.input.impl.systemfield.SystemFields; import org.apache.druid.data.input.impl.systemfield.SystemFields;
import org.apache.druid.guice.BuiltInTypesModule; import org.apache.druid.guice.BuiltInTypesModule;
import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.msq.indexing.MSQSpec; import org.apache.druid.msq.indexing.MSQSpec;
import org.apache.druid.msq.indexing.MSQTuningConfig; import org.apache.druid.msq.indexing.MSQTuningConfig;
import org.apache.druid.msq.indexing.destination.TaskReportMSQDestination; import org.apache.druid.msq.indexing.destination.TaskReportMSQDestination;
@ -34,8 +36,20 @@ import org.apache.druid.msq.test.MSQTestBase;
import org.apache.druid.msq.util.MultiStageQueryContext; import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.DataSource; import org.apache.druid.query.DataSource;
import org.apache.druid.query.NestedDataTestUtils; import org.apache.druid.query.NestedDataTestUtils;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.FilteredAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory; import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.query.filter.NotDimFilter;
import org.apache.druid.query.filter.NullFilter;
import org.apache.druid.query.filter.SelectorDimFilter;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.GroupByQueryConfig; import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.groupby.orderby.DefaultLimitSpec;
import org.apache.druid.query.groupby.orderby.OrderByColumnSpec;
import org.apache.druid.query.ordering.StringComparators;
import org.apache.druid.query.scan.ScanQuery; import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.column.RowSignature;
@ -44,8 +58,10 @@ import org.apache.druid.sql.calcite.external.ExternalDataSource;
import org.apache.druid.sql.calcite.filtration.Filtration; import org.apache.druid.sql.calcite.filtration.Filtration;
import org.apache.druid.sql.calcite.planner.ColumnMapping; import org.apache.druid.sql.calcite.planner.ColumnMapping;
import org.apache.druid.sql.calcite.planner.ColumnMappings; import org.apache.druid.sql.calcite.planner.ColumnMappings;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.SegmentId;
import org.apache.druid.utils.CompressionUtils; import org.apache.druid.utils.CompressionUtils;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.MethodSource;
@ -70,6 +86,7 @@ public class MSQComplexGroupByTest extends MSQTestBase
private String dataFileNameJsonString; private String dataFileNameJsonString;
private String dataFileSignatureJsonString; private String dataFileSignatureJsonString;
private DataSource dataFileExternalDataSource; private DataSource dataFileExternalDataSource;
private File dataFile;
public static Collection<Object[]> data() public static Collection<Object[]> data()
{ {
@ -85,7 +102,7 @@ public class MSQComplexGroupByTest extends MSQTestBase
@BeforeEach @BeforeEach
public void setup() throws IOException public void setup() throws IOException
{ {
File dataFile = newTempFile("dataFile"); dataFile = newTempFile("dataFile");
final InputStream resourceStream = this.getClass().getClassLoader() final InputStream resourceStream = this.getClass().getClassLoader()
.getResourceAsStream(NestedDataTestUtils.ALL_TYPES_TEST_DATA_FILE); .getResourceAsStream(NestedDataTestUtils.ALL_TYPES_TEST_DATA_FILE);
final InputStream decompressing = CompressionUtils.decompress( final InputStream decompressing = CompressionUtils.decompress(
@ -416,4 +433,185 @@ public class MSQComplexGroupByTest extends MSQTestBase
)) ))
.verifyResults(); .verifyResults();
} }
@MethodSource("data")
@ParameterizedTest(name = "{index}:with context {0}")
public void testExactCountDistinctOnNestedData(String contextName, Map<String, Object> context)
{
Assumptions.assumeTrue(NullHandling.sqlCompatible());
RowSignature rowSignature = RowSignature.builder()
.add("distinct_obj", ColumnType.LONG)
.build();
Map<String, Object> modifiedContext = ImmutableMap.<String, Object>builder()
.putAll(context)
.put(PlannerConfig.CTX_KEY_USE_APPROXIMATE_COUNT_DISTINCT, false)
.build();
DimFilter innerFilter = NullHandling.replaceWithDefault()
? new SelectorDimFilter("d0", null, null)
: new NullFilter("d0", null);
testSelectQuery().setSql("SELECT\n"
+ " COUNT(DISTINCT obj) AS distinct_obj\n"
+ "FROM TABLE(\n"
+ " EXTERN(\n"
+ " '{ \"files\": [" + dataFileNameJsonString + "],\"type\":\"local\"}',\n"
+ " '{\"type\": \"json\"}',\n"
+ " '[{\"name\": \"timestamp\", \"type\": \"STRING\"}, {\"name\": \"obj\", \"type\": \"COMPLEX<json>\"}]'\n"
+ " )\n"
+ " )\n"
+ " ORDER BY 1")
.setQueryContext(ImmutableMap.of(PlannerConfig.CTX_KEY_USE_APPROXIMATE_COUNT_DISTINCT, false))
.setExpectedMSQSpec(
MSQSpec
.builder()
.query(
GroupByQuery
.builder()
.setDataSource(
new QueryDataSource(
GroupByQuery
.builder()
.setDataSource(dataFileExternalDataSource)
.setQuerySegmentSpec(querySegmentSpec(Intervals.ETERNITY))
.setDimensions(
new DefaultDimensionSpec("obj", "d0", ColumnType.NESTED_DATA)
)
.setGranularity(Granularities.ALL)
.setContext(modifiedContext)
.build()
)
)
.setAggregatorSpecs(
new FilteredAggregatorFactory(
new CountAggregatorFactory("a0"),
new NotDimFilter(innerFilter),
"a0"
)
)
.setQuerySegmentSpec(querySegmentSpec(Intervals.ETERNITY))
.setGranularity(Granularities.ALL)
.setLimitSpec(new DefaultLimitSpec(
ImmutableList.of(
new OrderByColumnSpec(
"a0",
OrderByColumnSpec.Direction.ASCENDING,
StringComparators.NUMERIC
)
),
Integer.MAX_VALUE
))
.setContext(modifiedContext)
.build()
)
.columnMappings(new ColumnMappings(ImmutableList.of(
new ColumnMapping("a0", "distinct_obj")
)))
.tuningConfig(MSQTuningConfig.defaultConfig())
.destination(TaskReportMSQDestination.INSTANCE)
.build()
)
.setExpectedRowSignature(rowSignature)
.setQueryContext(modifiedContext)
.setExpectedResultRows(ImmutableList.of(
new Object[]{7L}
))
.verifyResults();
}
@MethodSource("data")
@ParameterizedTest(name = "{index}:with context {0}")
public void testExactCountDistinctOnNestedData2(String contextName, Map<String, Object> context)
{
Assumptions.assumeTrue(NullHandling.sqlCompatible());
RowSignature dataFileSignature = RowSignature.builder()
.add("timestamp", ColumnType.STRING)
.add("cObj", ColumnType.NESTED_DATA)
.build();
DataSource dataFileExternalDataSource2 = new ExternalDataSource(
new LocalInputSource(null, null, ImmutableList.of(dataFile), SystemFields.none()),
new JsonInputFormat(null, null, null, null, null),
dataFileSignature
);
RowSignature rowSignature = RowSignature.builder()
.add("distinct_obj", ColumnType.LONG)
.build();
Map<String, Object> modifiedContext = ImmutableMap.<String, Object>builder()
.putAll(context)
.put(PlannerConfig.CTX_KEY_USE_APPROXIMATE_COUNT_DISTINCT, false)
.build();
DimFilter innerFilter = NullHandling.replaceWithDefault()
? new SelectorDimFilter("d0", null, null)
: new NullFilter("d0", null);
testSelectQuery().setSql("SELECT\n"
+ " COUNT(DISTINCT cObj) AS distinct_obj\n"
+ "FROM TABLE(\n"
+ " EXTERN(\n"
+ " '{ \"files\": [" + dataFileNameJsonString + "],\"type\":\"local\"}',\n"
+ " '{\"type\": \"json\"}',\n"
+ " '[{\"name\": \"timestamp\", \"type\": \"STRING\"}, {\"name\": \"cObj\", \"type\": \"COMPLEX<json>\"}]'\n"
+ " )\n"
+ " )\n"
+ " ORDER BY 1")
.setQueryContext(ImmutableMap.of(PlannerConfig.CTX_KEY_USE_APPROXIMATE_COUNT_DISTINCT, false))
.setExpectedMSQSpec(
MSQSpec
.builder()
.query(
GroupByQuery
.builder()
.setDataSource(
new QueryDataSource(
GroupByQuery
.builder()
.setDataSource(dataFileExternalDataSource2)
.setQuerySegmentSpec(querySegmentSpec(Intervals.ETERNITY))
.setDimensions(
new DefaultDimensionSpec("cObj", "d0", ColumnType.NESTED_DATA)
)
.setGranularity(Granularities.ALL)
.setContext(modifiedContext)
.build()
)
)
.setAggregatorSpecs(
new FilteredAggregatorFactory(
new CountAggregatorFactory("a0"),
new NotDimFilter(innerFilter),
"a0"
)
)
.setQuerySegmentSpec(querySegmentSpec(Intervals.ETERNITY))
.setGranularity(Granularities.ALL)
.setLimitSpec(new DefaultLimitSpec(
ImmutableList.of(
new OrderByColumnSpec(
"a0",
OrderByColumnSpec.Direction.ASCENDING,
StringComparators.NUMERIC
)
),
Integer.MAX_VALUE
))
.setContext(modifiedContext)
.build()
)
.columnMappings(new ColumnMappings(ImmutableList.of(
new ColumnMapping("a0", "distinct_obj")
)))
.tuningConfig(MSQTuningConfig.defaultConfig())
.destination(TaskReportMSQDestination.INSTANCE)
.build()
)
.setExpectedRowSignature(rowSignature)
.setQueryContext(modifiedContext)
.setExpectedResultRows(ImmutableList.of(
new Object[]{1L}
))
.verifyResults();
}
} }

View File

@ -640,6 +640,22 @@ public class WorkerTaskManager implements IndexerTaskCountStatsProvider
return getNumTasksPerDatasource(this.getCompletedTasks().values(), TaskAnnouncement::getTaskDataSource); return getNumTasksPerDatasource(this.getCompletedTasks().values(), TaskAnnouncement::getTaskDataSource);
} }
@Override
public Map<String, Long> getWorkerFailedTasks()
{
return getNumTasksPerDatasource(completedTasks.entrySet().stream()
.filter(entry -> entry.getValue().getTaskStatus().isFailure())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)).values(), TaskAnnouncement::getTaskDataSource);
}
@Override
public Map<String, Long> getWorkerSuccessfulTasks()
{
return getNumTasksPerDatasource(completedTasks.entrySet().stream()
.filter(entry -> entry.getValue().getTaskStatus().isSuccess())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)).values(), TaskAnnouncement::getTaskDataSource);
}
private static class TaskDetails private static class TaskDetails
{ {
private final Task task; private final Task task;

View File

@ -455,6 +455,19 @@ public class WorkerTaskManagerTest
return new NoopTask(id, null, dataSource, 100, 0, ImmutableMap.of(Tasks.PRIORITY_KEY, 0)); return new NoopTask(id, null, dataSource, 100, 0, ImmutableMap.of(Tasks.PRIORITY_KEY, 0));
} }
private NoopTask createNoopFailingTask(String id, String dataSource)
{
return new NoopTask(id, null, dataSource, 100, 0, ImmutableMap.of(Tasks.PRIORITY_KEY, 0))
{
@Override
public TaskStatus runTask(TaskToolbox toolbox) throws Exception
{
Thread.sleep(getRunTime());
return TaskStatus.failure(getId(), "Failed to complete the task");
}
};
}
/** /**
* Start the {@link #workerTaskManager}, submit a {@link NoopTask}, wait for it to be complete. Common preamble * Start the {@link #workerTaskManager}, submit a {@link NoopTask}, wait for it to be complete. Common preamble
* for various tests of {@link WorkerTaskManager#doCompletedTasksCleanup()}. * for various tests of {@link WorkerTaskManager#doCompletedTasksCleanup()}.
@ -494,7 +507,7 @@ public class WorkerTaskManagerTest
Task task1 = createNoopTask("task1", "wikipedia"); Task task1 = createNoopTask("task1", "wikipedia");
Task task2 = createNoopTask("task2", "wikipedia"); Task task2 = createNoopTask("task2", "wikipedia");
Task task3 = createNoopTask("task3", "animals"); Task task3 = createNoopFailingTask("task3", "animals");
workerTaskManager.start(); workerTaskManager.start();
// befor assigning tasks we should get no running tasks // befor assigning tasks we should get no running tasks
@ -517,11 +530,19 @@ public class WorkerTaskManagerTest
Thread.sleep(10); Thread.sleep(10);
} while (!runningTasks.isEmpty()); } while (!runningTasks.isEmpty());
// When running tasks are empty all task should be reported as completed // When running tasks are empty all task should be reported as completed and
// one of the task for animals datasource should fail and other 2 tasks in
// the wikipedia datasource should succeed
Assert.assertEquals(workerTaskManager.getWorkerCompletedTasks(), ImmutableMap.of( Assert.assertEquals(workerTaskManager.getWorkerCompletedTasks(), ImmutableMap.of(
"wikipedia", 2L, "wikipedia", 2L,
"animals", 1L "animals", 1L
)); ));
Assert.assertEquals(workerTaskManager.getWorkerFailedTasks(), ImmutableMap.of(
"animals", 1L
));
Assert.assertEquals(workerTaskManager.getWorkerSuccessfulTasks(), ImmutableMap.of(
"wikipedia", 2L
));
Assert.assertEquals(workerTaskManager.getWorkerAssignedTasks().size(), 0L); Assert.assertEquals(workerTaskManager.getWorkerAssignedTasks().size(), 0L);
} }
} }

View File

@ -150,12 +150,16 @@ public class ComplexFieldReader implements FieldReader
private final Memory memory; private final Memory memory;
private final ReadableFieldPointer fieldPointer; private final ReadableFieldPointer fieldPointer;
private final ComplexMetricSerde serde; private final ComplexMetricSerde serde;
@SuppressWarnings("rawtypes")
private final Class clazz;
private Selector(Memory memory, ReadableFieldPointer fieldPointer, ComplexMetricSerde serde) private Selector(Memory memory, ReadableFieldPointer fieldPointer, ComplexMetricSerde serde)
{ {
this.memory = memory; this.memory = memory;
this.fieldPointer = fieldPointer; this.fieldPointer = fieldPointer;
this.serde = serde; this.serde = serde;
//noinspection deprecation
this.clazz = serde.getObjectStrategy().getClazz();
} }
@Nullable @Nullable
@ -169,7 +173,8 @@ public class ComplexFieldReader implements FieldReader
@Override @Override
public Class<T> classOfObject() public Class<T> classOfObject()
{ {
return serde.getExtractor().extractedClass(); //noinspection unchecked
return clazz;
} }
@Override @Override

View File

@ -124,6 +124,7 @@ public class ComplexFrameColumnReader implements FrameColumnReader
{ {
private final Frame frame; private final Frame frame;
private final ComplexMetricSerde serde; private final ComplexMetricSerde serde;
private final Class<?> clazz;
private final Memory memory; private final Memory memory;
private final long startOfOffsetSection; private final long startOfOffsetSection;
private final long startOfDataSection; private final long startOfDataSection;
@ -138,6 +139,8 @@ public class ComplexFrameColumnReader implements FrameColumnReader
{ {
this.frame = frame; this.frame = frame;
this.serde = serde; this.serde = serde;
//noinspection deprecation
this.clazz = serde.getObjectStrategy().getClazz();
this.memory = memory; this.memory = memory;
this.startOfOffsetSection = startOfOffsetSection; this.startOfOffsetSection = startOfOffsetSection;
this.startOfDataSection = startOfDataSection; this.startOfDataSection = startOfDataSection;
@ -158,7 +161,7 @@ public class ComplexFrameColumnReader implements FrameColumnReader
@Override @Override
public Class<?> classOfObject() public Class<?> classOfObject()
{ {
return serde.getExtractor().extractedClass(); return clazz;
} }
@Override @Override

View File

@ -32,6 +32,7 @@ public class DruidMetrics
public static final String TYPE = "type"; public static final String TYPE = "type";
public static final String INTERVAL = "interval"; public static final String INTERVAL = "interval";
public static final String ID = "id"; public static final String ID = "id";
public static final String SUBQUERY_ID = "subQueryId";
public static final String TASK_ID = "taskId"; public static final String TASK_ID = "taskId";
public static final String GROUP_ID = "groupId"; public static final String GROUP_ID = "groupId";
public static final String STATUS = "status"; public static final String STATUS = "status";

View File

@ -37,7 +37,9 @@ import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.query.DataSource; import org.apache.druid.query.DataSource;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.query.FluentQueryRunner; import org.apache.druid.query.FluentQueryRunner;
import org.apache.druid.query.FrameBasedInlineDataSource; import org.apache.druid.query.FrameBasedInlineDataSource;
import org.apache.druid.query.FrameSignaturePair; import org.apache.druid.query.FrameSignaturePair;
@ -94,6 +96,8 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
private static final Logger log = new Logger(ClientQuerySegmentWalker.class); private static final Logger log = new Logger(ClientQuerySegmentWalker.class);
private static final int FRAME_SIZE = 8_000_000; private static final int FRAME_SIZE = 8_000_000;
public static final String ROWS_COUNT_METRIC = "subquery/rows";
public static final String BYTES_COUNT_METRIC = "subquery/bytes";
private final ServiceEmitter emitter; private final ServiceEmitter emitter;
private final QuerySegmentWalker clusterClient; private final QuerySegmentWalker clusterClient;
@ -447,7 +451,9 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
maxSubqueryRows, maxSubqueryRows,
maxSubqueryMemory, maxSubqueryMemory,
useNestedForUnknownTypeInSubquery, useNestedForUnknownTypeInSubquery,
subqueryStatsProvider subqueryStatsProvider,
!dryRun,
emitter
); );
} else { } else {
// Cannot inline subquery. Attempt to inline one level deeper, and then try again. // Cannot inline subquery. Attempt to inline one level deeper, and then try again.
@ -553,8 +559,8 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
* It also plumbs parent query's id and sql id in case the subqueries don't have it set by default * It also plumbs parent query's id and sql id in case the subqueries don't have it set by default
* *
* @param rootDataSource Datasource whose subqueries need to be populated * @param rootDataSource Datasource whose subqueries need to be populated
* @param parentQueryId Parent Query's ID, can be null if do not need to update this in the subqueries * @param parentQueryId Parent Query's ID, can be null if it does not need to update this in the subqueries
* @param parentSqlQueryId Parent Query's SQL Query ID, can be null if do not need to update this in the subqueries * @param parentSqlQueryId Parent Query's SQL Query ID, can be null if it does not need to update this in the subqueries
* @return DataSource populated with the subqueries * @return DataSource populated with the subqueries
*/ */
private DataSource generateSubqueryIds( private DataSource generateSubqueryIds(
@ -642,6 +648,9 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
} }
/** /**
*/
/**
*
* Convert the results of a particular query into a materialized (List-based) InlineDataSource. * Convert the results of a particular query into a materialized (List-based) InlineDataSource.
* *
* @param query the query * @param query the query
@ -651,6 +660,13 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
* particular master query * particular master query
* @param limit user-configured limit. If negative, will be treated as {@link Integer#MAX_VALUE}. * @param limit user-configured limit. If negative, will be treated as {@link Integer#MAX_VALUE}.
* If zero, this method will throw an error immediately. * If zero, this method will throw an error immediately.
* @param memoryLimit User configured byte limit.
* @param useNestedForUnknownTypeInSubquery Uses nested json for unknown types when materializing subquery results
* @param subqueryStatsProvider Statistics about the subquery materialization
* @param emitMetrics Flag to control if the metrics need to be emitted while materializing. The metrics are omitted
* when we are performing a dry run of the query to avoid double reporting the same metric incorrectly
* @param emitter Metrics emitter
* @return Inlined datasource represented by the provided results
* @throws ResourceLimitExceededException if the limit is exceeded * @throws ResourceLimitExceededException if the limit is exceeded
*/ */
private static <T, QueryType extends Query<T>> DataSource toInlineDataSource( private static <T, QueryType extends Query<T>> DataSource toInlineDataSource(
@ -662,8 +678,10 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
final AtomicBoolean cannotMaterializeToFrames, final AtomicBoolean cannotMaterializeToFrames,
final int limit, final int limit,
long memoryLimit, long memoryLimit,
boolean useNestedForUnknownTypeInSubquery, final boolean useNestedForUnknownTypeInSubquery,
SubqueryCountStatsProvider subqueryStatsProvider final SubqueryCountStatsProvider subqueryStatsProvider,
final boolean emitMetrics,
final ServiceEmitter emitter
) )
{ {
final int rowLimitToUse = limit < 0 ? Integer.MAX_VALUE : limit; final int rowLimitToUse = limit < 0 ? Integer.MAX_VALUE : limit;
@ -683,7 +701,9 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
toolChest, toolChest,
limitAccumulator, limitAccumulator,
limit, limit,
subqueryStatsProvider subqueryStatsProvider,
emitMetrics,
emitter
); );
break; break;
case MEMORY_LIMIT: case MEMORY_LIMIT:
@ -699,7 +719,9 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
memoryLimitAccumulator, memoryLimitAccumulator,
memoryLimit, memoryLimit,
useNestedForUnknownTypeInSubquery, useNestedForUnknownTypeInSubquery,
subqueryStatsProvider subqueryStatsProvider,
emitMetrics,
emitter
); );
if (!maybeDataSource.isPresent()) { if (!maybeDataSource.isPresent()) {
cannotMaterializeToFrames.set(true); cannotMaterializeToFrames.set(true);
@ -716,7 +738,9 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
toolChest, toolChest,
limitAccumulator, limitAccumulator,
limit, limit,
subqueryStatsProvider subqueryStatsProvider,
emitMetrics,
emitter
); );
} else { } else {
subqueryStatsProvider.incrementSubqueriesWithByteLimit(); subqueryStatsProvider.incrementSubqueriesWithByteLimit();
@ -739,9 +763,11 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
final QueryToolChest<T, QueryType> toolChest, final QueryToolChest<T, QueryType> toolChest,
final AtomicInteger limitAccumulator, final AtomicInteger limitAccumulator,
final AtomicLong memoryLimitAccumulator, final AtomicLong memoryLimitAccumulator,
long memoryLimit, final long memoryLimit,
boolean useNestedForUnknownTypeInSubquery, final boolean useNestedForUnknownTypeInSubquery,
final SubqueryCountStatsProvider subqueryStatsProvider final SubqueryCountStatsProvider subqueryStatsProvider,
final boolean emitMetrics,
final ServiceEmitter emitter
) )
{ {
Optional<Sequence<FrameSignaturePair>> framesOptional; Optional<Sequence<FrameSignaturePair>> framesOptional;
@ -764,6 +790,8 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
startedAccumulating = true; startedAccumulating = true;
final int initialSubqueryRows = limitAccumulator.get();
final long initialSubqueryBytes = memoryLimitAccumulator.get();
frames.forEach( frames.forEach(
frame -> { frame -> {
limitAccumulator.addAndGet(frame.getFrame().numRows()); limitAccumulator.addAndGet(frame.getFrame().numRows());
@ -775,6 +803,21 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
frameSignaturePairs.add(frame); frameSignaturePairs.add(frame);
} }
); );
if (emitMetrics) {
emitter.emit(
ServiceMetricEvent.builder()
.setDimension(DruidMetrics.ID, query.getId())
.setDimension(DruidMetrics.SUBQUERY_ID, query.getSubQueryId())
.setMetric(ROWS_COUNT_METRIC, limitAccumulator.get() - initialSubqueryRows)
);
emitter.emit(
ServiceMetricEvent.builder()
.setDimension(DruidMetrics.ID, query.getId())
.setDimension(DruidMetrics.SUBQUERY_ID, query.getSubQueryId())
.setMetric(BYTES_COUNT_METRIC, memoryLimitAccumulator.get() - initialSubqueryBytes)
);
}
return Optional.of(new FrameBasedInlineDataSource(frameSignaturePairs, toolChest.resultArraySignature(query))); return Optional.of(new FrameBasedInlineDataSource(frameSignaturePairs, toolChest.resultArraySignature(query)));
} }
catch (UnsupportedColumnTypeException e) { catch (UnsupportedColumnTypeException e) {
@ -811,7 +854,9 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
final QueryToolChest<T, QueryType> toolChest, final QueryToolChest<T, QueryType> toolChest,
final AtomicInteger limitAccumulator, final AtomicInteger limitAccumulator,
final int limit, final int limit,
final SubqueryCountStatsProvider subqueryStatsProvider final SubqueryCountStatsProvider subqueryStatsProvider,
boolean emitMetrics,
final ServiceEmitter emitter
) )
{ {
final int rowLimitToUse = limit < 0 ? Integer.MAX_VALUE : limit; final int rowLimitToUse = limit < 0 ? Integer.MAX_VALUE : limit;
@ -819,6 +864,7 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
final ArrayList<Object[]> resultList = new ArrayList<>(); final ArrayList<Object[]> resultList = new ArrayList<>();
final int initialSubqueryRows = limitAccumulator.get();
toolChest.resultsAsArrays(query, results).accumulate( toolChest.resultsAsArrays(query, results).accumulate(
resultList, resultList,
(acc, in) -> { (acc, in) -> {
@ -830,6 +876,14 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
return acc; return acc;
} }
); );
if (emitMetrics) {
emitter.emit(
ServiceMetricEvent.builder()
.setDimension(DruidMetrics.ID, query.getId())
.setDimension(DruidMetrics.SUBQUERY_ID, query.getSubQueryId())
.setMetric(ROWS_COUNT_METRIC, limitAccumulator.get() - initialSubqueryRows)
);
}
return InlineDataSource.fromIterable(resultList, signature); return InlineDataSource.fromIterable(resultList, signature);
} }

View File

@ -41,4 +41,8 @@ public interface IndexerTaskCountStatsProvider
* Map from datasource name to the number of completed tasks by the Indexer. * Map from datasource name to the number of completed tasks by the Indexer.
*/ */
Map<String, Long> getWorkerCompletedTasks(); Map<String, Long> getWorkerCompletedTasks();
Map<String, Long> getWorkerFailedTasks();
Map<String, Long> getWorkerSuccessfulTasks();
} }

View File

@ -72,6 +72,8 @@ public class WorkerTaskCountStatsMonitor extends AbstractMonitor
emit(emitter, "worker/task/running/count", indexerStatsProvider.getWorkerRunningTasks()); emit(emitter, "worker/task/running/count", indexerStatsProvider.getWorkerRunningTasks());
emit(emitter, "worker/task/assigned/count", indexerStatsProvider.getWorkerAssignedTasks()); emit(emitter, "worker/task/assigned/count", indexerStatsProvider.getWorkerAssignedTasks());
emit(emitter, "worker/task/completed/count", indexerStatsProvider.getWorkerCompletedTasks()); emit(emitter, "worker/task/completed/count", indexerStatsProvider.getWorkerCompletedTasks());
emit(emitter, "worker/task/failed/count", indexerStatsProvider.getWorkerFailedTasks());
emit(emitter, "worker/task/success/count", indexerStatsProvider.getWorkerSuccessfulTasks());
} }
return true; return true;
} }

View File

@ -32,8 +32,12 @@ import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.core.EventMap;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.math.expr.ExprMacroTable; import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.DataSource; import org.apache.druid.query.DataSource;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.query.Druids; import org.apache.druid.query.Druids;
import org.apache.druid.query.FrameBasedInlineDataSource; import org.apache.druid.query.FrameBasedInlineDataSource;
import org.apache.druid.query.GlobalTableDataSource; import org.apache.druid.query.GlobalTableDataSource;
@ -214,6 +218,7 @@ public class ClientQuerySegmentWalkerTest
private Closer closer; private Closer closer;
private QueryRunnerFactoryConglomerate conglomerate; private QueryRunnerFactoryConglomerate conglomerate;
private final StubServiceEmitter emitter = new StubServiceEmitter();
// Queries that are issued; checked by "testQuery" against its "expectedQueries" parameter. // Queries that are issued; checked by "testQuery" against its "expectedQueries" parameter.
private final List<ExpectedQuery> issuedQueries = new ArrayList<>(); private final List<ExpectedQuery> issuedQueries = new ArrayList<>();
@ -228,6 +233,7 @@ public class ClientQuerySegmentWalkerTest
public void setUp() public void setUp()
{ {
closer = Closer.create(); closer = Closer.create();
emitter.flush();
conglomerate = QueryStackTests.createQueryRunnerFactoryConglomerate(closer); conglomerate = QueryStackTests.createQueryRunnerFactoryConglomerate(closer);
scheduler = new ObservableQueryScheduler( scheduler = new ObservableQueryScheduler(
8, 8,
@ -941,6 +947,113 @@ public class ClientQuerySegmentWalkerTest
testQuery(query, ImmutableList.of(), ImmutableList.of()); testQuery(query, ImmutableList.of(), ImmutableList.of());
} }
@Test
public void testMetricsWithMaxSubqueryRowsEnabled()
{
final GroupByQuery subquery =
GroupByQuery.builder()
.setDataSource(FOO)
.setGranularity(Granularities.ALL)
.setInterval(Collections.singletonList(INTERVAL))
.setDimensions(DefaultDimensionSpec.of("s"))
.build();
final TimeseriesQuery query =
(TimeseriesQuery) Druids.newTimeseriesQueryBuilder()
.dataSource(new QueryDataSource(subquery))
.granularity(Granularities.ALL)
.intervals(Intervals.ONLY_ETERNITY)
.aggregators(new CountAggregatorFactory("cnt"))
.build()
.withId(DUMMY_QUERY_ID);
testQuery(
query,
new ArrayList<>(ImmutableList.of(
ExpectedQuery.cluster(subquery.withId(DUMMY_QUERY_ID).withSubQueryId("1.1")),
ExpectedQuery.local(
query.withDataSource(
InlineDataSource.fromIterable(
ImmutableList.of(new Object[]{"x"}, new Object[]{"y"}, new Object[]{"z"}),
RowSignature.builder().add("s", ColumnType.STRING).build()
)
)
)
)),
ImmutableList.of(new Object[]{Intervals.ETERNITY.getStartMillis(), 3L})
);
List<Event> events = emitter.getEvents();
for (Event event : events) {
EventMap map = event.toMap();
if (ClientQuerySegmentWalker.ROWS_COUNT_METRIC.equals(map.get("metric"))) {
Assert.assertTrue(map.containsKey("host"));
Assert.assertTrue(map.containsKey("service"));
Assert.assertEquals(DUMMY_QUERY_ID, map.get(DruidMetrics.ID));
Assert.assertEquals("1.1", map.get(DruidMetrics.SUBQUERY_ID));
Assert.assertEquals(3, map.get("value"));
}
}
}
@Test
public void testMetricsWithMaxSubqueryBytesEnabled()
{
final GroupByQuery subquery =
GroupByQuery.builder()
.setDataSource(FOO)
.setGranularity(Granularities.ALL)
.setInterval(Collections.singletonList(INTERVAL))
.setDimensions(DefaultDimensionSpec.of("s"))
.build();
final TimeseriesQuery query =
(TimeseriesQuery) Druids.newTimeseriesQueryBuilder()
.dataSource(new QueryDataSource(subquery))
.granularity(Granularities.ALL)
.intervals(Intervals.ONLY_ETERNITY)
.aggregators(new CountAggregatorFactory("cnt"))
.context(ImmutableMap.of(QueryContexts.MAX_SUBQUERY_BYTES_KEY, "10000"))
.build()
.withId(DUMMY_QUERY_ID);
testQuery(
query,
new ArrayList<>(ImmutableList.of(
ExpectedQuery.cluster(subquery.withId(DUMMY_QUERY_ID).withSubQueryId("1.1")),
ExpectedQuery.local(
query.withDataSource(
InlineDataSource.fromIterable(
ImmutableList.of(new Object[]{"x"}, new Object[]{"y"}, new Object[]{"z"}),
RowSignature.builder().add("s", ColumnType.STRING).build()
)
)
)
)),
ImmutableList.of(new Object[]{Intervals.ETERNITY.getStartMillis(), 3L})
);
List<Event> events = emitter.getEvents();
for (Event event : events) {
EventMap map = event.toMap();
if (ClientQuerySegmentWalker.ROWS_COUNT_METRIC.equals(map.get("metric"))) {
Assert.assertTrue(map.containsKey("host"));
Assert.assertTrue(map.containsKey("service"));
Assert.assertEquals(DUMMY_QUERY_ID, map.get(DruidMetrics.ID));
Assert.assertEquals("1.1", map.get(DruidMetrics.SUBQUERY_ID));
Assert.assertEquals(3, map.get("value"));
} else if (ClientQuerySegmentWalker.BYTES_COUNT_METRIC.equals(map.get("metric"))) {
Assert.assertTrue(map.containsKey("host"));
Assert.assertTrue(map.containsKey("service"));
Assert.assertEquals(DUMMY_QUERY_ID, map.get(DruidMetrics.ID));
Assert.assertEquals("1.1", map.get(DruidMetrics.SUBQUERY_ID));
Assert.assertEquals(43L, map.get("value"));
}
}
}
@Test @Test
public void testGroupByOnArraysDoubles() public void testGroupByOnArraysDoubles()
{ {
@ -1545,13 +1658,15 @@ public class ClientQuerySegmentWalkerTest
conglomerate, conglomerate,
segmentWrangler, segmentWrangler,
joinableFactoryWrapper, joinableFactoryWrapper,
schedulerForTest schedulerForTest,
emitter
), ),
ClusterOrLocal.LOCAL ClusterOrLocal.LOCAL
), ),
conglomerate, conglomerate,
joinableFactory, joinableFactory,
serverConfig serverConfig,
emitter
); );
} }

View File

@ -87,7 +87,6 @@ import org.apache.druid.segment.join.JoinableFactoryWrapper;
import org.apache.druid.segment.join.LookupJoinableFactory; import org.apache.druid.segment.join.LookupJoinableFactory;
import org.apache.druid.segment.join.MapJoinableFactory; import org.apache.druid.segment.join.MapJoinableFactory;
import org.apache.druid.server.initialization.ServerConfig; import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.server.metrics.SubqueryCountStatsProvider; import org.apache.druid.server.metrics.SubqueryCountStatsProvider;
import org.apache.druid.server.scheduling.ManualQueryPrioritizationStrategy; import org.apache.druid.server.scheduling.ManualQueryPrioritizationStrategy;
import org.apache.druid.server.scheduling.NoQueryLaningStrategy; import org.apache.druid.server.scheduling.NoQueryLaningStrategy;
@ -117,7 +116,6 @@ public class QueryStackTests
public static final int DEFAULT_NUM_MERGE_BUFFERS = -1; public static final int DEFAULT_NUM_MERGE_BUFFERS = -1;
private static final ServiceEmitter EMITTER = new NoopServiceEmitter();
private static final int COMPUTE_BUFFER_SIZE = 10 * 1024 * 1024; private static final int COMPUTE_BUFFER_SIZE = 10 * 1024 * 1024;
private QueryStackTests() private QueryStackTests()
@ -131,11 +129,12 @@ public class QueryStackTests
final QuerySegmentWalker localWalker, final QuerySegmentWalker localWalker,
final QueryRunnerFactoryConglomerate conglomerate, final QueryRunnerFactoryConglomerate conglomerate,
final JoinableFactory joinableFactory, final JoinableFactory joinableFactory,
final ServerConfig serverConfig final ServerConfig serverConfig,
final ServiceEmitter emitter
) )
{ {
return new ClientQuerySegmentWalker( return new ClientQuerySegmentWalker(
EMITTER, emitter,
clusterWalker, clusterWalker,
localWalker, localWalker,
new QueryToolChestWarehouse() new QueryToolChestWarehouse()
@ -172,7 +171,8 @@ public class QueryStackTests
final QueryRunnerFactoryConglomerate conglomerate, final QueryRunnerFactoryConglomerate conglomerate,
final SegmentWrangler segmentWrangler, final SegmentWrangler segmentWrangler,
final JoinableFactoryWrapper joinableFactoryWrapper, final JoinableFactoryWrapper joinableFactoryWrapper,
final QueryScheduler scheduler final QueryScheduler scheduler,
final ServiceEmitter emitter
) )
{ {
return new LocalQuerySegmentWalker( return new LocalQuerySegmentWalker(
@ -180,7 +180,7 @@ public class QueryStackTests
segmentWrangler, segmentWrangler,
joinableFactoryWrapper, joinableFactoryWrapper,
scheduler, scheduler,
EMITTER emitter
); );
} }

View File

@ -49,6 +49,7 @@ import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.join.JoinableFactory; import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.join.JoinableFactoryWrapper; import org.apache.druid.segment.join.JoinableFactoryWrapper;
import org.apache.druid.server.initialization.ServerConfig; import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.VersionedIntervalTimeline; import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.joda.time.Interval; import org.joda.time.Interval;
@ -133,11 +134,13 @@ public class SpecificSegmentsQuerySegmentWalker implements QuerySegmentWalker, C
conglomerate, conglomerate,
segmentWrangler, segmentWrangler,
joinableFactoryWrapper, joinableFactoryWrapper,
scheduler scheduler,
new NoopServiceEmitter()
), ),
conglomerate, conglomerate,
joinableFactoryWrapper.getJoinableFactory(), joinableFactoryWrapper.getJoinableFactory(),
new ServerConfig() new ServerConfig(),
new NoopServiceEmitter()
) )
); );
} }

View File

@ -120,6 +120,24 @@ public class WorkerTaskCountStatsMonitorTest
"metrics", 9L "metrics", 9L
); );
} }
@Override
public Map<String, Long> getWorkerFailedTasks()
{
return ImmutableMap.of(
"movies", 4L,
"games", 6L
);
}
@Override
public Map<String, Long> getWorkerSuccessfulTasks()
{
return ImmutableMap.of(
"games", 23L,
"inventory", 89L
);
}
}; };
nullStatsProvider = new WorkerTaskCountStatsProvider() nullStatsProvider = new WorkerTaskCountStatsProvider()
@ -239,7 +257,7 @@ public class WorkerTaskCountStatsMonitorTest
new WorkerTaskCountStatsMonitor(injectorForIndexer, ImmutableSet.of(NodeRole.INDEXER)); new WorkerTaskCountStatsMonitor(injectorForIndexer, ImmutableSet.of(NodeRole.INDEXER));
final StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); final StubServiceEmitter emitter = new StubServiceEmitter("service", "host");
monitor.doMonitor(emitter); monitor.doMonitor(emitter);
Assert.assertEquals(6, emitter.getEvents().size()); Assert.assertEquals(10, emitter.getEvents().size());
emitter.verifyValue( emitter.verifyValue(
"worker/task/running/count", "worker/task/running/count",
ImmutableMap.of("dataSource", "wikipedia"), ImmutableMap.of("dataSource", "wikipedia"),
@ -270,6 +288,26 @@ public class WorkerTaskCountStatsMonitorTest
ImmutableMap.of("dataSource", "metrics"), ImmutableMap.of("dataSource", "metrics"),
9L 9L
); );
emitter.verifyValue(
"worker/task/failed/count",
ImmutableMap.of("dataSource", "movies"),
4L
);
emitter.verifyValue(
"worker/task/failed/count",
ImmutableMap.of("dataSource", "games"),
6L
);
emitter.verifyValue(
"worker/task/success/count",
ImmutableMap.of("dataSource", "games"),
23L
);
emitter.verifyValue(
"worker/task/success/count",
ImmutableMap.of("dataSource", "inventory"),
89L
);
} }
@Test @Test
public void testMonitorWithNulls() public void testMonitorWithNulls()