diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md index aa35b0681ec..ec97f44fe39 100644 --- a/docs/operations/metrics.md +++ b/docs/operations/metrics.md @@ -77,6 +77,8 @@ Most metric values reset each emission period, as specified in `druid.monitoring |`metadatacache/schemaPoll/time`|Time taken for coordinator polls to fetch datasource schema.||| |`serverview/sync/healthy`|Sync status of the Broker with a segment-loading server such as a Historical or Peon. Emitted only when [HTTP-based server view](../configuration/index.md#segment-management) is enabled. This metric can be used in conjunction with `serverview/sync/unstableTime` to debug slow startup of Brokers.|`server`, `tier`|1 for fully synced servers, 0 otherwise| |`serverview/sync/unstableTime`|Time in milliseconds for which the Broker has been failing to sync with a segment-loading server. Emitted only when [HTTP-based server view](../configuration/index.md#segment-management) is enabled.|`server`, `tier`|Not emitted for synced servers.| +|`subquery/rows`|Number of rows materialized by the subquery's results. |`id`, `subqueryId`| Varies | +|`subquery/bytes`|Number of bytes materialized by the subquery's results. This metric is only emitted if the query uses [byte-based subquery guardrails](https://druid.apache.org/docs/latest/configuration/#guardrails-for-materialization-of-subqueries) |`id`, `subqueryId` | Varies | |`subquery/rowLimit/count`|Number of subqueries whose results are materialized as rows (Java objects on heap).|This metric is only available if the `SubqueryCountStatsMonitor` module is included.| | |`subquery/byteLimit/count`|Number of subqueries whose results are materialized as frames (Druid's internal byte representation of rows).|This metric is only available if the `SubqueryCountStatsMonitor` module is included.| | |`subquery/fallback/count`|Number of subqueries which cannot be materialized as frames|This metric is only available if the `SubqueryCountStatsMonitor` module is included.| | @@ -297,6 +299,8 @@ If the JVM does not support CPU time measurement for the current thread, `ingest |`worker/taskSlot/used/count`|Number of busy task slots on the reporting worker per emission period. This metric is only available if the `WorkerTaskCountStatsMonitor` module is included.| `category`, `workerVersion`|Varies| |`worker/task/assigned/count`|Number of tasks assigned to an indexer per emission period. This metric is only available if the `WorkerTaskCountStatsMonitor` module is included.|`dataSource`|Varies| |`worker/task/completed/count`|Number of tasks completed by an indexer per emission period. This metric is only available if the `WorkerTaskCountStatsMonitor` module is included.|`dataSource`|Varies| +|`worker/task/failed/count`|Number of tasks that failed on an indexer during the emission period. This metric is only available if the `WorkerTaskCountStatsMonitor` module is included.|`dataSource`|Varies| +|`worker/task/success/count`|Number of tasks that succeeded on an indexer during the emission period. This metric is only available if the `WorkerTaskCountStatsMonitor` module is included.|`dataSource`|Varies| |`worker/task/running/count`|Number of tasks running on an indexer per emission period. This metric is only available if the `WorkerTaskCountStatsMonitor` module is included.|`dataSource`|Varies| ## Shuffle metrics (Native parallel task) diff --git a/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json b/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json index ad065c63d39..91d7c4c4abd 100644 --- a/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json +++ b/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json @@ -74,8 +74,8 @@ "worker/task/assigned/count" : { "dimensions" : ["dataSource"], "type" : "count" }, "worker/task/running/count" : { "dimensions" : ["dataSource"], "type" : "count" }, "worker/task/completed/count" : { "dimensions" : ["dataSource"], "type" : "count" }, - "worker/task/failed/count" : { "dimensions" : ["category", "workerVersion"], "type" : "count" }, - "worker/task/success/count" : { "dimensions" : ["category", "workerVersion"], "type" : "count" }, + "worker/task/failed/count" : { "dimensions" : ["category", "workerVersion", "dataSource"], "type" : "count" }, + "worker/task/success/count" : { "dimensions" : ["category", "workerVersion", "dataSource"], "type" : "count" }, "worker/taskSlot/idle/count" : { "dimensions" : ["category", "workerVersion"], "type" : "gauge" }, "worker/taskSlot/total/count" : { "dimensions" : ["category", "workerVersion"], "type" : "gauge" }, "worker/taskSlot/used/count" : { "dimensions" : ["category", "workerVersion"], "type" : "gauge" }, diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/resources/SqlStatementResource.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/resources/SqlStatementResource.java index 322727aea92..f5662280478 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/resources/SqlStatementResource.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/resources/SqlStatementResource.java @@ -113,7 +113,6 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; -import java.util.function.Supplier; import java.util.stream.Collectors; @@ -588,19 +587,19 @@ public class SqlStatementResource MSQControllerTask msqControllerTask = getMSQControllerTaskAndCheckPermission(queryId, authenticationResult, forAction); SqlStatementState sqlStatementState = SqlStatementResourceHelper.getSqlStatementState(statusPlus); - Supplier> msqTaskReportPayloadSupplier = () -> { + MSQTaskReportPayload taskReportPayload = null; + if (detail || SqlStatementState.FAILED == sqlStatementState) { try { - return Optional.ofNullable(SqlStatementResourceHelper.getPayload( + taskReportPayload = SqlStatementResourceHelper.getPayload( contactOverlord(overlordClient.taskReportAsMap(queryId), queryId) - )); + ); } catch (DruidException e) { - if (e.getErrorCode().equals("notFound") || e.getMessage().contains("Unable to contact overlord")) { - return Optional.empty(); + if (!e.getErrorCode().equals("notFound") && !e.getMessage().contains("Unable to contact overlord")) { + throw e; } - throw e; } - }; + } if (SqlStatementState.FAILED == sqlStatementState) { return SqlStatementResourceHelper.getExceptionPayload( @@ -608,7 +607,7 @@ public class SqlStatementResource taskResponse, statusPlus, sqlStatementState, - msqTaskReportPayloadSupplier.get().orElse(null), + taskReportPayload, jsonMapper, detail ); @@ -627,9 +626,9 @@ public class SqlStatementResource msqControllerTask.getQuerySpec().getDestination() ).orElse(null) : null, null, - detail ? SqlStatementResourceHelper.getQueryStagesReport(msqTaskReportPayloadSupplier.get().orElse(null)) : null, - detail ? SqlStatementResourceHelper.getQueryCounters(msqTaskReportPayloadSupplier.get().orElse(null)) : null, - detail ? SqlStatementResourceHelper.getQueryWarningDetails(msqTaskReportPayloadSupplier.get().orElse(null)) : null + SqlStatementResourceHelper.getQueryStagesReport(taskReportPayload), + SqlStatementResourceHelper.getQueryCounters(taskReportPayload), + SqlStatementResourceHelper.getQueryWarningDetails(taskReportPayload) )); } } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQComplexGroupByTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQComplexGroupByTest.java index da4f975b24a..f937a988d8e 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQComplexGroupByTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQComplexGroupByTest.java @@ -22,11 +22,13 @@ package org.apache.druid.msq.exec; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import org.apache.druid.common.config.NullHandling; import org.apache.druid.data.input.impl.JsonInputFormat; import org.apache.druid.data.input.impl.LocalInputSource; import org.apache.druid.data.input.impl.systemfield.SystemFields; import org.apache.druid.guice.BuiltInTypesModule; import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.msq.indexing.MSQSpec; import org.apache.druid.msq.indexing.MSQTuningConfig; import org.apache.druid.msq.indexing.destination.TaskReportMSQDestination; @@ -34,8 +36,20 @@ import org.apache.druid.msq.test.MSQTestBase; import org.apache.druid.msq.util.MultiStageQueryContext; import org.apache.druid.query.DataSource; import org.apache.druid.query.NestedDataTestUtils; +import org.apache.druid.query.QueryDataSource; +import org.apache.druid.query.aggregation.CountAggregatorFactory; +import org.apache.druid.query.aggregation.FilteredAggregatorFactory; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; +import org.apache.druid.query.dimension.DefaultDimensionSpec; +import org.apache.druid.query.filter.DimFilter; +import org.apache.druid.query.filter.NotDimFilter; +import org.apache.druid.query.filter.NullFilter; +import org.apache.druid.query.filter.SelectorDimFilter; +import org.apache.druid.query.groupby.GroupByQuery; import org.apache.druid.query.groupby.GroupByQueryConfig; +import org.apache.druid.query.groupby.orderby.DefaultLimitSpec; +import org.apache.druid.query.groupby.orderby.OrderByColumnSpec; +import org.apache.druid.query.ordering.StringComparators; import org.apache.druid.query.scan.ScanQuery; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; @@ -44,8 +58,10 @@ import org.apache.druid.sql.calcite.external.ExternalDataSource; import org.apache.druid.sql.calcite.filtration.Filtration; import org.apache.druid.sql.calcite.planner.ColumnMapping; import org.apache.druid.sql.calcite.planner.ColumnMappings; +import org.apache.druid.sql.calcite.planner.PlannerConfig; import org.apache.druid.timeline.SegmentId; import org.apache.druid.utils.CompressionUtils; +import org.junit.jupiter.api.Assumptions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; @@ -70,6 +86,7 @@ public class MSQComplexGroupByTest extends MSQTestBase private String dataFileNameJsonString; private String dataFileSignatureJsonString; private DataSource dataFileExternalDataSource; + private File dataFile; public static Collection data() { @@ -85,9 +102,9 @@ public class MSQComplexGroupByTest extends MSQTestBase @BeforeEach public void setup() throws IOException { - File dataFile = newTempFile("dataFile"); + dataFile = newTempFile("dataFile"); final InputStream resourceStream = this.getClass().getClassLoader() - .getResourceAsStream(NestedDataTestUtils.ALL_TYPES_TEST_DATA_FILE); + .getResourceAsStream(NestedDataTestUtils.ALL_TYPES_TEST_DATA_FILE); final InputStream decompressing = CompressionUtils.decompress( resourceStream, "nested-all-types-test-data.json" @@ -416,4 +433,185 @@ public class MSQComplexGroupByTest extends MSQTestBase )) .verifyResults(); } + + @MethodSource("data") + @ParameterizedTest(name = "{index}:with context {0}") + public void testExactCountDistinctOnNestedData(String contextName, Map context) + { + Assumptions.assumeTrue(NullHandling.sqlCompatible()); + RowSignature rowSignature = RowSignature.builder() + .add("distinct_obj", ColumnType.LONG) + .build(); + + Map modifiedContext = ImmutableMap.builder() + .putAll(context) + .put(PlannerConfig.CTX_KEY_USE_APPROXIMATE_COUNT_DISTINCT, false) + .build(); + + DimFilter innerFilter = NullHandling.replaceWithDefault() + ? new SelectorDimFilter("d0", null, null) + : new NullFilter("d0", null); + + testSelectQuery().setSql("SELECT\n" + + " COUNT(DISTINCT obj) AS distinct_obj\n" + + "FROM TABLE(\n" + + " EXTERN(\n" + + " '{ \"files\": [" + dataFileNameJsonString + "],\"type\":\"local\"}',\n" + + " '{\"type\": \"json\"}',\n" + + " '[{\"name\": \"timestamp\", \"type\": \"STRING\"}, {\"name\": \"obj\", \"type\": \"COMPLEX\"}]'\n" + + " )\n" + + " )\n" + + " ORDER BY 1") + .setQueryContext(ImmutableMap.of(PlannerConfig.CTX_KEY_USE_APPROXIMATE_COUNT_DISTINCT, false)) + .setExpectedMSQSpec( + MSQSpec + .builder() + .query( + GroupByQuery + .builder() + .setDataSource( + new QueryDataSource( + GroupByQuery + .builder() + .setDataSource(dataFileExternalDataSource) + .setQuerySegmentSpec(querySegmentSpec(Intervals.ETERNITY)) + .setDimensions( + new DefaultDimensionSpec("obj", "d0", ColumnType.NESTED_DATA) + ) + .setGranularity(Granularities.ALL) + .setContext(modifiedContext) + .build() + ) + ) + .setAggregatorSpecs( + new FilteredAggregatorFactory( + new CountAggregatorFactory("a0"), + new NotDimFilter(innerFilter), + "a0" + ) + ) + .setQuerySegmentSpec(querySegmentSpec(Intervals.ETERNITY)) + .setGranularity(Granularities.ALL) + .setLimitSpec(new DefaultLimitSpec( + ImmutableList.of( + new OrderByColumnSpec( + "a0", + OrderByColumnSpec.Direction.ASCENDING, + StringComparators.NUMERIC + ) + ), + Integer.MAX_VALUE + )) + .setContext(modifiedContext) + .build() + ) + .columnMappings(new ColumnMappings(ImmutableList.of( + new ColumnMapping("a0", "distinct_obj") + ))) + .tuningConfig(MSQTuningConfig.defaultConfig()) + .destination(TaskReportMSQDestination.INSTANCE) + .build() + ) + .setExpectedRowSignature(rowSignature) + .setQueryContext(modifiedContext) + .setExpectedResultRows(ImmutableList.of( + new Object[]{7L} + )) + .verifyResults(); + } + + @MethodSource("data") + @ParameterizedTest(name = "{index}:with context {0}") + public void testExactCountDistinctOnNestedData2(String contextName, Map context) + { + Assumptions.assumeTrue(NullHandling.sqlCompatible()); + RowSignature dataFileSignature = RowSignature.builder() + .add("timestamp", ColumnType.STRING) + .add("cObj", ColumnType.NESTED_DATA) + .build(); + DataSource dataFileExternalDataSource2 = new ExternalDataSource( + new LocalInputSource(null, null, ImmutableList.of(dataFile), SystemFields.none()), + new JsonInputFormat(null, null, null, null, null), + dataFileSignature + ); + RowSignature rowSignature = RowSignature.builder() + .add("distinct_obj", ColumnType.LONG) + .build(); + + Map modifiedContext = ImmutableMap.builder() + .putAll(context) + .put(PlannerConfig.CTX_KEY_USE_APPROXIMATE_COUNT_DISTINCT, false) + .build(); + + DimFilter innerFilter = NullHandling.replaceWithDefault() + ? new SelectorDimFilter("d0", null, null) + : new NullFilter("d0", null); + + testSelectQuery().setSql("SELECT\n" + + " COUNT(DISTINCT cObj) AS distinct_obj\n" + + "FROM TABLE(\n" + + " EXTERN(\n" + + " '{ \"files\": [" + dataFileNameJsonString + "],\"type\":\"local\"}',\n" + + " '{\"type\": \"json\"}',\n" + + " '[{\"name\": \"timestamp\", \"type\": \"STRING\"}, {\"name\": \"cObj\", \"type\": \"COMPLEX\"}]'\n" + + " )\n" + + " )\n" + + " ORDER BY 1") + .setQueryContext(ImmutableMap.of(PlannerConfig.CTX_KEY_USE_APPROXIMATE_COUNT_DISTINCT, false)) + .setExpectedMSQSpec( + MSQSpec + .builder() + .query( + GroupByQuery + .builder() + .setDataSource( + new QueryDataSource( + GroupByQuery + .builder() + .setDataSource(dataFileExternalDataSource2) + .setQuerySegmentSpec(querySegmentSpec(Intervals.ETERNITY)) + .setDimensions( + new DefaultDimensionSpec("cObj", "d0", ColumnType.NESTED_DATA) + ) + .setGranularity(Granularities.ALL) + .setContext(modifiedContext) + .build() + ) + ) + .setAggregatorSpecs( + new FilteredAggregatorFactory( + new CountAggregatorFactory("a0"), + new NotDimFilter(innerFilter), + "a0" + ) + ) + .setQuerySegmentSpec(querySegmentSpec(Intervals.ETERNITY)) + .setGranularity(Granularities.ALL) + .setLimitSpec(new DefaultLimitSpec( + ImmutableList.of( + new OrderByColumnSpec( + "a0", + OrderByColumnSpec.Direction.ASCENDING, + StringComparators.NUMERIC + ) + ), + Integer.MAX_VALUE + )) + .setContext(modifiedContext) + .build() + ) + .columnMappings(new ColumnMappings(ImmutableList.of( + new ColumnMapping("a0", "distinct_obj") + ))) + .tuningConfig(MSQTuningConfig.defaultConfig()) + .destination(TaskReportMSQDestination.INSTANCE) + .build() + ) + .setExpectedRowSignature(rowSignature) + .setQueryContext(modifiedContext) + .setExpectedResultRows(ImmutableList.of( + new Object[]{1L} + )) + .verifyResults(); + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java index 729ac1d1617..a1c131ad0f4 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java @@ -640,6 +640,22 @@ public class WorkerTaskManager implements IndexerTaskCountStatsProvider return getNumTasksPerDatasource(this.getCompletedTasks().values(), TaskAnnouncement::getTaskDataSource); } + @Override + public Map getWorkerFailedTasks() + { + return getNumTasksPerDatasource(completedTasks.entrySet().stream() + .filter(entry -> entry.getValue().getTaskStatus().isFailure()) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)).values(), TaskAnnouncement::getTaskDataSource); + } + + @Override + public Map getWorkerSuccessfulTasks() + { + return getNumTasksPerDatasource(completedTasks.entrySet().stream() + .filter(entry -> entry.getValue().getTaskStatus().isSuccess()) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)).values(), TaskAnnouncement::getTaskDataSource); + } + private static class TaskDetails { private final Task task; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java index 6b08be3a3c6..37839f8e077 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java @@ -455,6 +455,19 @@ public class WorkerTaskManagerTest return new NoopTask(id, null, dataSource, 100, 0, ImmutableMap.of(Tasks.PRIORITY_KEY, 0)); } + private NoopTask createNoopFailingTask(String id, String dataSource) + { + return new NoopTask(id, null, dataSource, 100, 0, ImmutableMap.of(Tasks.PRIORITY_KEY, 0)) + { + @Override + public TaskStatus runTask(TaskToolbox toolbox) throws Exception + { + Thread.sleep(getRunTime()); + return TaskStatus.failure(getId(), "Failed to complete the task"); + } + }; + } + /** * Start the {@link #workerTaskManager}, submit a {@link NoopTask}, wait for it to be complete. Common preamble * for various tests of {@link WorkerTaskManager#doCompletedTasksCleanup()}. @@ -494,7 +507,7 @@ public class WorkerTaskManagerTest Task task1 = createNoopTask("task1", "wikipedia"); Task task2 = createNoopTask("task2", "wikipedia"); - Task task3 = createNoopTask("task3", "animals"); + Task task3 = createNoopFailingTask("task3", "animals"); workerTaskManager.start(); // befor assigning tasks we should get no running tasks @@ -517,11 +530,19 @@ public class WorkerTaskManagerTest Thread.sleep(10); } while (!runningTasks.isEmpty()); - // When running tasks are empty all task should be reported as completed + // When running tasks are empty all task should be reported as completed and + // one of the task for animals datasource should fail and other 2 tasks in + // the wikipedia datasource should succeed Assert.assertEquals(workerTaskManager.getWorkerCompletedTasks(), ImmutableMap.of( "wikipedia", 2L, "animals", 1L )); + Assert.assertEquals(workerTaskManager.getWorkerFailedTasks(), ImmutableMap.of( + "animals", 1L + )); + Assert.assertEquals(workerTaskManager.getWorkerSuccessfulTasks(), ImmutableMap.of( + "wikipedia", 2L + )); Assert.assertEquals(workerTaskManager.getWorkerAssignedTasks().size(), 0L); } } diff --git a/processing/src/main/java/org/apache/druid/frame/field/ComplexFieldReader.java b/processing/src/main/java/org/apache/druid/frame/field/ComplexFieldReader.java index c0a0c872fae..29bf0945adb 100644 --- a/processing/src/main/java/org/apache/druid/frame/field/ComplexFieldReader.java +++ b/processing/src/main/java/org/apache/druid/frame/field/ComplexFieldReader.java @@ -150,12 +150,16 @@ public class ComplexFieldReader implements FieldReader private final Memory memory; private final ReadableFieldPointer fieldPointer; private final ComplexMetricSerde serde; + @SuppressWarnings("rawtypes") + private final Class clazz; private Selector(Memory memory, ReadableFieldPointer fieldPointer, ComplexMetricSerde serde) { this.memory = memory; this.fieldPointer = fieldPointer; this.serde = serde; + //noinspection deprecation + this.clazz = serde.getObjectStrategy().getClazz(); } @Nullable @@ -169,7 +173,8 @@ public class ComplexFieldReader implements FieldReader @Override public Class classOfObject() { - return serde.getExtractor().extractedClass(); + //noinspection unchecked + return clazz; } @Override diff --git a/processing/src/main/java/org/apache/druid/frame/read/columnar/ComplexFrameColumnReader.java b/processing/src/main/java/org/apache/druid/frame/read/columnar/ComplexFrameColumnReader.java index 7059d76f391..23f3a7d7432 100644 --- a/processing/src/main/java/org/apache/druid/frame/read/columnar/ComplexFrameColumnReader.java +++ b/processing/src/main/java/org/apache/druid/frame/read/columnar/ComplexFrameColumnReader.java @@ -124,6 +124,7 @@ public class ComplexFrameColumnReader implements FrameColumnReader { private final Frame frame; private final ComplexMetricSerde serde; + private final Class clazz; private final Memory memory; private final long startOfOffsetSection; private final long startOfDataSection; @@ -138,6 +139,8 @@ public class ComplexFrameColumnReader implements FrameColumnReader { this.frame = frame; this.serde = serde; + //noinspection deprecation + this.clazz = serde.getObjectStrategy().getClazz(); this.memory = memory; this.startOfOffsetSection = startOfOffsetSection; this.startOfDataSection = startOfDataSection; @@ -158,7 +161,7 @@ public class ComplexFrameColumnReader implements FrameColumnReader @Override public Class classOfObject() { - return serde.getExtractor().extractedClass(); + return clazz; } @Override diff --git a/processing/src/main/java/org/apache/druid/query/DruidMetrics.java b/processing/src/main/java/org/apache/druid/query/DruidMetrics.java index f85dd001600..1f478809333 100644 --- a/processing/src/main/java/org/apache/druid/query/DruidMetrics.java +++ b/processing/src/main/java/org/apache/druid/query/DruidMetrics.java @@ -32,6 +32,7 @@ public class DruidMetrics public static final String TYPE = "type"; public static final String INTERVAL = "interval"; public static final String ID = "id"; + public static final String SUBQUERY_ID = "subQueryId"; public static final String TASK_ID = "taskId"; public static final String GROUP_ID = "groupId"; public static final String STATUS = "status"; diff --git a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java index d49ce3909f7..990878eda6e 100644 --- a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java @@ -37,7 +37,9 @@ import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.query.DataSource; +import org.apache.druid.query.DruidMetrics; import org.apache.druid.query.FluentQueryRunner; import org.apache.druid.query.FrameBasedInlineDataSource; import org.apache.druid.query.FrameSignaturePair; @@ -94,6 +96,8 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker private static final Logger log = new Logger(ClientQuerySegmentWalker.class); private static final int FRAME_SIZE = 8_000_000; + public static final String ROWS_COUNT_METRIC = "subquery/rows"; + public static final String BYTES_COUNT_METRIC = "subquery/bytes"; private final ServiceEmitter emitter; private final QuerySegmentWalker clusterClient; @@ -447,7 +451,9 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker maxSubqueryRows, maxSubqueryMemory, useNestedForUnknownTypeInSubquery, - subqueryStatsProvider + subqueryStatsProvider, + !dryRun, + emitter ); } else { // Cannot inline subquery. Attempt to inline one level deeper, and then try again. @@ -553,8 +559,8 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker * It also plumbs parent query's id and sql id in case the subqueries don't have it set by default * * @param rootDataSource Datasource whose subqueries need to be populated - * @param parentQueryId Parent Query's ID, can be null if do not need to update this in the subqueries - * @param parentSqlQueryId Parent Query's SQL Query ID, can be null if do not need to update this in the subqueries + * @param parentQueryId Parent Query's ID, can be null if it does not need to update this in the subqueries + * @param parentSqlQueryId Parent Query's SQL Query ID, can be null if it does not need to update this in the subqueries * @return DataSource populated with the subqueries */ private DataSource generateSubqueryIds( @@ -642,6 +648,9 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker } /** + */ + /** + * * Convert the results of a particular query into a materialized (List-based) InlineDataSource. * * @param query the query @@ -651,6 +660,13 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker * particular master query * @param limit user-configured limit. If negative, will be treated as {@link Integer#MAX_VALUE}. * If zero, this method will throw an error immediately. + * @param memoryLimit User configured byte limit. + * @param useNestedForUnknownTypeInSubquery Uses nested json for unknown types when materializing subquery results + * @param subqueryStatsProvider Statistics about the subquery materialization + * @param emitMetrics Flag to control if the metrics need to be emitted while materializing. The metrics are omitted + * when we are performing a dry run of the query to avoid double reporting the same metric incorrectly + * @param emitter Metrics emitter + * @return Inlined datasource represented by the provided results * @throws ResourceLimitExceededException if the limit is exceeded */ private static > DataSource toInlineDataSource( @@ -662,8 +678,10 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker final AtomicBoolean cannotMaterializeToFrames, final int limit, long memoryLimit, - boolean useNestedForUnknownTypeInSubquery, - SubqueryCountStatsProvider subqueryStatsProvider + final boolean useNestedForUnknownTypeInSubquery, + final SubqueryCountStatsProvider subqueryStatsProvider, + final boolean emitMetrics, + final ServiceEmitter emitter ) { final int rowLimitToUse = limit < 0 ? Integer.MAX_VALUE : limit; @@ -683,7 +701,9 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker toolChest, limitAccumulator, limit, - subqueryStatsProvider + subqueryStatsProvider, + emitMetrics, + emitter ); break; case MEMORY_LIMIT: @@ -699,7 +719,9 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker memoryLimitAccumulator, memoryLimit, useNestedForUnknownTypeInSubquery, - subqueryStatsProvider + subqueryStatsProvider, + emitMetrics, + emitter ); if (!maybeDataSource.isPresent()) { cannotMaterializeToFrames.set(true); @@ -716,7 +738,9 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker toolChest, limitAccumulator, limit, - subqueryStatsProvider + subqueryStatsProvider, + emitMetrics, + emitter ); } else { subqueryStatsProvider.incrementSubqueriesWithByteLimit(); @@ -739,9 +763,11 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker final QueryToolChest toolChest, final AtomicInteger limitAccumulator, final AtomicLong memoryLimitAccumulator, - long memoryLimit, - boolean useNestedForUnknownTypeInSubquery, - final SubqueryCountStatsProvider subqueryStatsProvider + final long memoryLimit, + final boolean useNestedForUnknownTypeInSubquery, + final SubqueryCountStatsProvider subqueryStatsProvider, + final boolean emitMetrics, + final ServiceEmitter emitter ) { Optional> framesOptional; @@ -764,6 +790,8 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker startedAccumulating = true; + final int initialSubqueryRows = limitAccumulator.get(); + final long initialSubqueryBytes = memoryLimitAccumulator.get(); frames.forEach( frame -> { limitAccumulator.addAndGet(frame.getFrame().numRows()); @@ -775,6 +803,21 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker frameSignaturePairs.add(frame); } ); + if (emitMetrics) { + emitter.emit( + ServiceMetricEvent.builder() + .setDimension(DruidMetrics.ID, query.getId()) + .setDimension(DruidMetrics.SUBQUERY_ID, query.getSubQueryId()) + .setMetric(ROWS_COUNT_METRIC, limitAccumulator.get() - initialSubqueryRows) + ); + + emitter.emit( + ServiceMetricEvent.builder() + .setDimension(DruidMetrics.ID, query.getId()) + .setDimension(DruidMetrics.SUBQUERY_ID, query.getSubQueryId()) + .setMetric(BYTES_COUNT_METRIC, memoryLimitAccumulator.get() - initialSubqueryBytes) + ); + } return Optional.of(new FrameBasedInlineDataSource(frameSignaturePairs, toolChest.resultArraySignature(query))); } catch (UnsupportedColumnTypeException e) { @@ -811,7 +854,9 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker final QueryToolChest toolChest, final AtomicInteger limitAccumulator, final int limit, - final SubqueryCountStatsProvider subqueryStatsProvider + final SubqueryCountStatsProvider subqueryStatsProvider, + boolean emitMetrics, + final ServiceEmitter emitter ) { final int rowLimitToUse = limit < 0 ? Integer.MAX_VALUE : limit; @@ -819,6 +864,7 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker final ArrayList resultList = new ArrayList<>(); + final int initialSubqueryRows = limitAccumulator.get(); toolChest.resultsAsArrays(query, results).accumulate( resultList, (acc, in) -> { @@ -830,6 +876,14 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker return acc; } ); + if (emitMetrics) { + emitter.emit( + ServiceMetricEvent.builder() + .setDimension(DruidMetrics.ID, query.getId()) + .setDimension(DruidMetrics.SUBQUERY_ID, query.getSubQueryId()) + .setMetric(ROWS_COUNT_METRIC, limitAccumulator.get() - initialSubqueryRows) + ); + } return InlineDataSource.fromIterable(resultList, signature); } diff --git a/server/src/main/java/org/apache/druid/server/metrics/IndexerTaskCountStatsProvider.java b/server/src/main/java/org/apache/druid/server/metrics/IndexerTaskCountStatsProvider.java index 735bc27abb3..b38b461eb36 100644 --- a/server/src/main/java/org/apache/druid/server/metrics/IndexerTaskCountStatsProvider.java +++ b/server/src/main/java/org/apache/druid/server/metrics/IndexerTaskCountStatsProvider.java @@ -41,4 +41,8 @@ public interface IndexerTaskCountStatsProvider * Map from datasource name to the number of completed tasks by the Indexer. */ Map getWorkerCompletedTasks(); + + Map getWorkerFailedTasks(); + + Map getWorkerSuccessfulTasks(); } diff --git a/server/src/main/java/org/apache/druid/server/metrics/WorkerTaskCountStatsMonitor.java b/server/src/main/java/org/apache/druid/server/metrics/WorkerTaskCountStatsMonitor.java index d07311c1a46..bc09e95b5ce 100644 --- a/server/src/main/java/org/apache/druid/server/metrics/WorkerTaskCountStatsMonitor.java +++ b/server/src/main/java/org/apache/druid/server/metrics/WorkerTaskCountStatsMonitor.java @@ -72,6 +72,8 @@ public class WorkerTaskCountStatsMonitor extends AbstractMonitor emit(emitter, "worker/task/running/count", indexerStatsProvider.getWorkerRunningTasks()); emit(emitter, "worker/task/assigned/count", indexerStatsProvider.getWorkerAssignedTasks()); emit(emitter, "worker/task/completed/count", indexerStatsProvider.getWorkerCompletedTasks()); + emit(emitter, "worker/task/failed/count", indexerStatsProvider.getWorkerFailedTasks()); + emit(emitter, "worker/task/success/count", indexerStatsProvider.getWorkerSuccessfulTasks()); } return true; } diff --git a/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java b/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java index 038fbce7d45..467f375f9f7 100644 --- a/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java +++ b/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java @@ -32,8 +32,12 @@ import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.emitter.core.Event; +import org.apache.druid.java.util.emitter.core.EventMap; +import org.apache.druid.java.util.metrics.StubServiceEmitter; import org.apache.druid.math.expr.ExprMacroTable; import org.apache.druid.query.DataSource; +import org.apache.druid.query.DruidMetrics; import org.apache.druid.query.Druids; import org.apache.druid.query.FrameBasedInlineDataSource; import org.apache.druid.query.GlobalTableDataSource; @@ -214,6 +218,7 @@ public class ClientQuerySegmentWalkerTest private Closer closer; private QueryRunnerFactoryConglomerate conglomerate; + private final StubServiceEmitter emitter = new StubServiceEmitter(); // Queries that are issued; checked by "testQuery" against its "expectedQueries" parameter. private final List issuedQueries = new ArrayList<>(); @@ -228,6 +233,7 @@ public class ClientQuerySegmentWalkerTest public void setUp() { closer = Closer.create(); + emitter.flush(); conglomerate = QueryStackTests.createQueryRunnerFactoryConglomerate(closer); scheduler = new ObservableQueryScheduler( 8, @@ -941,6 +947,113 @@ public class ClientQuerySegmentWalkerTest testQuery(query, ImmutableList.of(), ImmutableList.of()); } + @Test + public void testMetricsWithMaxSubqueryRowsEnabled() + { + final GroupByQuery subquery = + GroupByQuery.builder() + .setDataSource(FOO) + .setGranularity(Granularities.ALL) + .setInterval(Collections.singletonList(INTERVAL)) + .setDimensions(DefaultDimensionSpec.of("s")) + .build(); + + final TimeseriesQuery query = + (TimeseriesQuery) Druids.newTimeseriesQueryBuilder() + .dataSource(new QueryDataSource(subquery)) + .granularity(Granularities.ALL) + .intervals(Intervals.ONLY_ETERNITY) + .aggregators(new CountAggregatorFactory("cnt")) + .build() + .withId(DUMMY_QUERY_ID); + + testQuery( + query, + new ArrayList<>(ImmutableList.of( + ExpectedQuery.cluster(subquery.withId(DUMMY_QUERY_ID).withSubQueryId("1.1")), + ExpectedQuery.local( + query.withDataSource( + InlineDataSource.fromIterable( + ImmutableList.of(new Object[]{"x"}, new Object[]{"y"}, new Object[]{"z"}), + RowSignature.builder().add("s", ColumnType.STRING).build() + ) + ) + ) + )), + ImmutableList.of(new Object[]{Intervals.ETERNITY.getStartMillis(), 3L}) + ); + + List events = emitter.getEvents(); + + for (Event event : events) { + EventMap map = event.toMap(); + if (ClientQuerySegmentWalker.ROWS_COUNT_METRIC.equals(map.get("metric"))) { + Assert.assertTrue(map.containsKey("host")); + Assert.assertTrue(map.containsKey("service")); + Assert.assertEquals(DUMMY_QUERY_ID, map.get(DruidMetrics.ID)); + Assert.assertEquals("1.1", map.get(DruidMetrics.SUBQUERY_ID)); + Assert.assertEquals(3, map.get("value")); + } + } + } + + @Test + public void testMetricsWithMaxSubqueryBytesEnabled() + { + final GroupByQuery subquery = + GroupByQuery.builder() + .setDataSource(FOO) + .setGranularity(Granularities.ALL) + .setInterval(Collections.singletonList(INTERVAL)) + .setDimensions(DefaultDimensionSpec.of("s")) + .build(); + + final TimeseriesQuery query = + (TimeseriesQuery) Druids.newTimeseriesQueryBuilder() + .dataSource(new QueryDataSource(subquery)) + .granularity(Granularities.ALL) + .intervals(Intervals.ONLY_ETERNITY) + .aggregators(new CountAggregatorFactory("cnt")) + .context(ImmutableMap.of(QueryContexts.MAX_SUBQUERY_BYTES_KEY, "10000")) + .build() + .withId(DUMMY_QUERY_ID); + + testQuery( + query, + new ArrayList<>(ImmutableList.of( + ExpectedQuery.cluster(subquery.withId(DUMMY_QUERY_ID).withSubQueryId("1.1")), + ExpectedQuery.local( + query.withDataSource( + InlineDataSource.fromIterable( + ImmutableList.of(new Object[]{"x"}, new Object[]{"y"}, new Object[]{"z"}), + RowSignature.builder().add("s", ColumnType.STRING).build() + ) + ) + ) + )), + ImmutableList.of(new Object[]{Intervals.ETERNITY.getStartMillis(), 3L}) + ); + + List events = emitter.getEvents(); + + for (Event event : events) { + EventMap map = event.toMap(); + if (ClientQuerySegmentWalker.ROWS_COUNT_METRIC.equals(map.get("metric"))) { + Assert.assertTrue(map.containsKey("host")); + Assert.assertTrue(map.containsKey("service")); + Assert.assertEquals(DUMMY_QUERY_ID, map.get(DruidMetrics.ID)); + Assert.assertEquals("1.1", map.get(DruidMetrics.SUBQUERY_ID)); + Assert.assertEquals(3, map.get("value")); + } else if (ClientQuerySegmentWalker.BYTES_COUNT_METRIC.equals(map.get("metric"))) { + Assert.assertTrue(map.containsKey("host")); + Assert.assertTrue(map.containsKey("service")); + Assert.assertEquals(DUMMY_QUERY_ID, map.get(DruidMetrics.ID)); + Assert.assertEquals("1.1", map.get(DruidMetrics.SUBQUERY_ID)); + Assert.assertEquals(43L, map.get("value")); + } + } + } + @Test public void testGroupByOnArraysDoubles() { @@ -1545,13 +1658,15 @@ public class ClientQuerySegmentWalkerTest conglomerate, segmentWrangler, joinableFactoryWrapper, - schedulerForTest - ), + schedulerForTest, + emitter + ), ClusterOrLocal.LOCAL ), conglomerate, joinableFactory, - serverConfig + serverConfig, + emitter ); } diff --git a/server/src/test/java/org/apache/druid/server/QueryStackTests.java b/server/src/test/java/org/apache/druid/server/QueryStackTests.java index f6bfebcf344..041c4654d92 100644 --- a/server/src/test/java/org/apache/druid/server/QueryStackTests.java +++ b/server/src/test/java/org/apache/druid/server/QueryStackTests.java @@ -87,7 +87,6 @@ import org.apache.druid.segment.join.JoinableFactoryWrapper; import org.apache.druid.segment.join.LookupJoinableFactory; import org.apache.druid.segment.join.MapJoinableFactory; import org.apache.druid.server.initialization.ServerConfig; -import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.server.metrics.SubqueryCountStatsProvider; import org.apache.druid.server.scheduling.ManualQueryPrioritizationStrategy; import org.apache.druid.server.scheduling.NoQueryLaningStrategy; @@ -117,7 +116,6 @@ public class QueryStackTests public static final int DEFAULT_NUM_MERGE_BUFFERS = -1; - private static final ServiceEmitter EMITTER = new NoopServiceEmitter(); private static final int COMPUTE_BUFFER_SIZE = 10 * 1024 * 1024; private QueryStackTests() @@ -131,11 +129,12 @@ public class QueryStackTests final QuerySegmentWalker localWalker, final QueryRunnerFactoryConglomerate conglomerate, final JoinableFactory joinableFactory, - final ServerConfig serverConfig + final ServerConfig serverConfig, + final ServiceEmitter emitter ) { return new ClientQuerySegmentWalker( - EMITTER, + emitter, clusterWalker, localWalker, new QueryToolChestWarehouse() @@ -172,7 +171,8 @@ public class QueryStackTests final QueryRunnerFactoryConglomerate conglomerate, final SegmentWrangler segmentWrangler, final JoinableFactoryWrapper joinableFactoryWrapper, - final QueryScheduler scheduler + final QueryScheduler scheduler, + final ServiceEmitter emitter ) { return new LocalQuerySegmentWalker( @@ -180,7 +180,7 @@ public class QueryStackTests segmentWrangler, joinableFactoryWrapper, scheduler, - EMITTER + emitter ); } diff --git a/server/src/test/java/org/apache/druid/server/SpecificSegmentsQuerySegmentWalker.java b/server/src/test/java/org/apache/druid/server/SpecificSegmentsQuerySegmentWalker.java index d9301f28cc0..746102cc6f5 100644 --- a/server/src/test/java/org/apache/druid/server/SpecificSegmentsQuerySegmentWalker.java +++ b/server/src/test/java/org/apache/druid/server/SpecificSegmentsQuerySegmentWalker.java @@ -49,6 +49,7 @@ import org.apache.druid.segment.incremental.IncrementalIndex; import org.apache.druid.segment.join.JoinableFactory; import org.apache.druid.segment.join.JoinableFactoryWrapper; import org.apache.druid.server.initialization.ServerConfig; +import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.VersionedIntervalTimeline; import org.joda.time.Interval; @@ -133,11 +134,13 @@ public class SpecificSegmentsQuerySegmentWalker implements QuerySegmentWalker, C conglomerate, segmentWrangler, joinableFactoryWrapper, - scheduler + scheduler, + new NoopServiceEmitter() ), conglomerate, joinableFactoryWrapper.getJoinableFactory(), - new ServerConfig() + new ServerConfig(), + new NoopServiceEmitter() ) ); } diff --git a/server/src/test/java/org/apache/druid/server/metrics/WorkerTaskCountStatsMonitorTest.java b/server/src/test/java/org/apache/druid/server/metrics/WorkerTaskCountStatsMonitorTest.java index ff9fcffb8d9..ad00e5e6dbd 100644 --- a/server/src/test/java/org/apache/druid/server/metrics/WorkerTaskCountStatsMonitorTest.java +++ b/server/src/test/java/org/apache/druid/server/metrics/WorkerTaskCountStatsMonitorTest.java @@ -120,6 +120,24 @@ public class WorkerTaskCountStatsMonitorTest "metrics", 9L ); } + + @Override + public Map getWorkerFailedTasks() + { + return ImmutableMap.of( + "movies", 4L, + "games", 6L + ); + } + + @Override + public Map getWorkerSuccessfulTasks() + { + return ImmutableMap.of( + "games", 23L, + "inventory", 89L + ); + } }; nullStatsProvider = new WorkerTaskCountStatsProvider() @@ -239,7 +257,7 @@ public class WorkerTaskCountStatsMonitorTest new WorkerTaskCountStatsMonitor(injectorForIndexer, ImmutableSet.of(NodeRole.INDEXER)); final StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); monitor.doMonitor(emitter); - Assert.assertEquals(6, emitter.getEvents().size()); + Assert.assertEquals(10, emitter.getEvents().size()); emitter.verifyValue( "worker/task/running/count", ImmutableMap.of("dataSource", "wikipedia"), @@ -270,6 +288,26 @@ public class WorkerTaskCountStatsMonitorTest ImmutableMap.of("dataSource", "metrics"), 9L ); + emitter.verifyValue( + "worker/task/failed/count", + ImmutableMap.of("dataSource", "movies"), + 4L + ); + emitter.verifyValue( + "worker/task/failed/count", + ImmutableMap.of("dataSource", "games"), + 6L + ); + emitter.verifyValue( + "worker/task/success/count", + ImmutableMap.of("dataSource", "games"), + 23L + ); + emitter.verifyValue( + "worker/task/success/count", + ImmutableMap.of("dataSource", "inventory"), + 89L + ); } @Test public void testMonitorWithNulls()