mirror of https://github.com/apache/druid.git
Merge remote-tracking branch 'apache/master' into quidem-record
This commit is contained in:
commit
fda0d63e44
|
@ -77,6 +77,8 @@ Most metric values reset each emission period, as specified in `druid.monitoring
|
||||||
|`metadatacache/schemaPoll/time`|Time taken for coordinator polls to fetch datasource schema.|||
|
|`metadatacache/schemaPoll/time`|Time taken for coordinator polls to fetch datasource schema.|||
|
||||||
|`serverview/sync/healthy`|Sync status of the Broker with a segment-loading server such as a Historical or Peon. Emitted only when [HTTP-based server view](../configuration/index.md#segment-management) is enabled. This metric can be used in conjunction with `serverview/sync/unstableTime` to debug slow startup of Brokers.|`server`, `tier`|1 for fully synced servers, 0 otherwise|
|
|`serverview/sync/healthy`|Sync status of the Broker with a segment-loading server such as a Historical or Peon. Emitted only when [HTTP-based server view](../configuration/index.md#segment-management) is enabled. This metric can be used in conjunction with `serverview/sync/unstableTime` to debug slow startup of Brokers.|`server`, `tier`|1 for fully synced servers, 0 otherwise|
|
||||||
|`serverview/sync/unstableTime`|Time in milliseconds for which the Broker has been failing to sync with a segment-loading server. Emitted only when [HTTP-based server view](../configuration/index.md#segment-management) is enabled.|`server`, `tier`|Not emitted for synced servers.|
|
|`serverview/sync/unstableTime`|Time in milliseconds for which the Broker has been failing to sync with a segment-loading server. Emitted only when [HTTP-based server view](../configuration/index.md#segment-management) is enabled.|`server`, `tier`|Not emitted for synced servers.|
|
||||||
|
|`subquery/rows`|Number of rows materialized by the subquery's results. |`id`, `subqueryId`| Varies |
|
||||||
|
|`subquery/bytes`|Number of bytes materialized by the subquery's results. This metric is only emitted if the query uses [byte-based subquery guardrails](https://druid.apache.org/docs/latest/configuration/#guardrails-for-materialization-of-subqueries) |`id`, `subqueryId` | Varies |
|
||||||
|`subquery/rowLimit/count`|Number of subqueries whose results are materialized as rows (Java objects on heap).|This metric is only available if the `SubqueryCountStatsMonitor` module is included.| |
|
|`subquery/rowLimit/count`|Number of subqueries whose results are materialized as rows (Java objects on heap).|This metric is only available if the `SubqueryCountStatsMonitor` module is included.| |
|
||||||
|`subquery/byteLimit/count`|Number of subqueries whose results are materialized as frames (Druid's internal byte representation of rows).|This metric is only available if the `SubqueryCountStatsMonitor` module is included.| |
|
|`subquery/byteLimit/count`|Number of subqueries whose results are materialized as frames (Druid's internal byte representation of rows).|This metric is only available if the `SubqueryCountStatsMonitor` module is included.| |
|
||||||
|`subquery/fallback/count`|Number of subqueries which cannot be materialized as frames|This metric is only available if the `SubqueryCountStatsMonitor` module is included.| |
|
|`subquery/fallback/count`|Number of subqueries which cannot be materialized as frames|This metric is only available if the `SubqueryCountStatsMonitor` module is included.| |
|
||||||
|
@ -297,6 +299,8 @@ If the JVM does not support CPU time measurement for the current thread, `ingest
|
||||||
|`worker/taskSlot/used/count`|Number of busy task slots on the reporting worker per emission period. This metric is only available if the `WorkerTaskCountStatsMonitor` module is included.| `category`, `workerVersion`|Varies|
|
|`worker/taskSlot/used/count`|Number of busy task slots on the reporting worker per emission period. This metric is only available if the `WorkerTaskCountStatsMonitor` module is included.| `category`, `workerVersion`|Varies|
|
||||||
|`worker/task/assigned/count`|Number of tasks assigned to an indexer per emission period. This metric is only available if the `WorkerTaskCountStatsMonitor` module is included.|`dataSource`|Varies|
|
|`worker/task/assigned/count`|Number of tasks assigned to an indexer per emission period. This metric is only available if the `WorkerTaskCountStatsMonitor` module is included.|`dataSource`|Varies|
|
||||||
|`worker/task/completed/count`|Number of tasks completed by an indexer per emission period. This metric is only available if the `WorkerTaskCountStatsMonitor` module is included.|`dataSource`|Varies|
|
|`worker/task/completed/count`|Number of tasks completed by an indexer per emission period. This metric is only available if the `WorkerTaskCountStatsMonitor` module is included.|`dataSource`|Varies|
|
||||||
|
|`worker/task/failed/count`|Number of tasks that failed on an indexer during the emission period. This metric is only available if the `WorkerTaskCountStatsMonitor` module is included.|`dataSource`|Varies|
|
||||||
|
|`worker/task/success/count`|Number of tasks that succeeded on an indexer during the emission period. This metric is only available if the `WorkerTaskCountStatsMonitor` module is included.|`dataSource`|Varies|
|
||||||
|`worker/task/running/count`|Number of tasks running on an indexer per emission period. This metric is only available if the `WorkerTaskCountStatsMonitor` module is included.|`dataSource`|Varies|
|
|`worker/task/running/count`|Number of tasks running on an indexer per emission period. This metric is only available if the `WorkerTaskCountStatsMonitor` module is included.|`dataSource`|Varies|
|
||||||
|
|
||||||
## Shuffle metrics (Native parallel task)
|
## Shuffle metrics (Native parallel task)
|
||||||
|
|
|
@ -74,8 +74,8 @@
|
||||||
"worker/task/assigned/count" : { "dimensions" : ["dataSource"], "type" : "count" },
|
"worker/task/assigned/count" : { "dimensions" : ["dataSource"], "type" : "count" },
|
||||||
"worker/task/running/count" : { "dimensions" : ["dataSource"], "type" : "count" },
|
"worker/task/running/count" : { "dimensions" : ["dataSource"], "type" : "count" },
|
||||||
"worker/task/completed/count" : { "dimensions" : ["dataSource"], "type" : "count" },
|
"worker/task/completed/count" : { "dimensions" : ["dataSource"], "type" : "count" },
|
||||||
"worker/task/failed/count" : { "dimensions" : ["category", "workerVersion"], "type" : "count" },
|
"worker/task/failed/count" : { "dimensions" : ["category", "workerVersion", "dataSource"], "type" : "count" },
|
||||||
"worker/task/success/count" : { "dimensions" : ["category", "workerVersion"], "type" : "count" },
|
"worker/task/success/count" : { "dimensions" : ["category", "workerVersion", "dataSource"], "type" : "count" },
|
||||||
"worker/taskSlot/idle/count" : { "dimensions" : ["category", "workerVersion"], "type" : "gauge" },
|
"worker/taskSlot/idle/count" : { "dimensions" : ["category", "workerVersion"], "type" : "gauge" },
|
||||||
"worker/taskSlot/total/count" : { "dimensions" : ["category", "workerVersion"], "type" : "gauge" },
|
"worker/taskSlot/total/count" : { "dimensions" : ["category", "workerVersion"], "type" : "gauge" },
|
||||||
"worker/taskSlot/used/count" : { "dimensions" : ["category", "workerVersion"], "type" : "gauge" },
|
"worker/taskSlot/used/count" : { "dimensions" : ["category", "workerVersion"], "type" : "gauge" },
|
||||||
|
|
|
@ -113,7 +113,6 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.function.Supplier;
|
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
|
||||||
|
@ -588,19 +587,19 @@ public class SqlStatementResource
|
||||||
MSQControllerTask msqControllerTask = getMSQControllerTaskAndCheckPermission(queryId, authenticationResult, forAction);
|
MSQControllerTask msqControllerTask = getMSQControllerTaskAndCheckPermission(queryId, authenticationResult, forAction);
|
||||||
SqlStatementState sqlStatementState = SqlStatementResourceHelper.getSqlStatementState(statusPlus);
|
SqlStatementState sqlStatementState = SqlStatementResourceHelper.getSqlStatementState(statusPlus);
|
||||||
|
|
||||||
Supplier<Optional<MSQTaskReportPayload>> msqTaskReportPayloadSupplier = () -> {
|
MSQTaskReportPayload taskReportPayload = null;
|
||||||
|
if (detail || SqlStatementState.FAILED == sqlStatementState) {
|
||||||
try {
|
try {
|
||||||
return Optional.ofNullable(SqlStatementResourceHelper.getPayload(
|
taskReportPayload = SqlStatementResourceHelper.getPayload(
|
||||||
contactOverlord(overlordClient.taskReportAsMap(queryId), queryId)
|
contactOverlord(overlordClient.taskReportAsMap(queryId), queryId)
|
||||||
));
|
);
|
||||||
}
|
}
|
||||||
catch (DruidException e) {
|
catch (DruidException e) {
|
||||||
if (e.getErrorCode().equals("notFound") || e.getMessage().contains("Unable to contact overlord")) {
|
if (!e.getErrorCode().equals("notFound") && !e.getMessage().contains("Unable to contact overlord")) {
|
||||||
return Optional.empty();
|
throw e;
|
||||||
}
|
}
|
||||||
throw e;
|
|
||||||
}
|
}
|
||||||
};
|
}
|
||||||
|
|
||||||
if (SqlStatementState.FAILED == sqlStatementState) {
|
if (SqlStatementState.FAILED == sqlStatementState) {
|
||||||
return SqlStatementResourceHelper.getExceptionPayload(
|
return SqlStatementResourceHelper.getExceptionPayload(
|
||||||
|
@ -608,7 +607,7 @@ public class SqlStatementResource
|
||||||
taskResponse,
|
taskResponse,
|
||||||
statusPlus,
|
statusPlus,
|
||||||
sqlStatementState,
|
sqlStatementState,
|
||||||
msqTaskReportPayloadSupplier.get().orElse(null),
|
taskReportPayload,
|
||||||
jsonMapper,
|
jsonMapper,
|
||||||
detail
|
detail
|
||||||
);
|
);
|
||||||
|
@ -627,9 +626,9 @@ public class SqlStatementResource
|
||||||
msqControllerTask.getQuerySpec().getDestination()
|
msqControllerTask.getQuerySpec().getDestination()
|
||||||
).orElse(null) : null,
|
).orElse(null) : null,
|
||||||
null,
|
null,
|
||||||
detail ? SqlStatementResourceHelper.getQueryStagesReport(msqTaskReportPayloadSupplier.get().orElse(null)) : null,
|
SqlStatementResourceHelper.getQueryStagesReport(taskReportPayload),
|
||||||
detail ? SqlStatementResourceHelper.getQueryCounters(msqTaskReportPayloadSupplier.get().orElse(null)) : null,
|
SqlStatementResourceHelper.getQueryCounters(taskReportPayload),
|
||||||
detail ? SqlStatementResourceHelper.getQueryWarningDetails(msqTaskReportPayloadSupplier.get().orElse(null)) : null
|
SqlStatementResourceHelper.getQueryWarningDetails(taskReportPayload)
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,11 +22,13 @@ package org.apache.druid.msq.exec;
|
||||||
import com.google.common.collect.ImmutableList;
|
import com.google.common.collect.ImmutableList;
|
||||||
import com.google.common.collect.ImmutableMap;
|
import com.google.common.collect.ImmutableMap;
|
||||||
import com.google.common.collect.ImmutableSet;
|
import com.google.common.collect.ImmutableSet;
|
||||||
|
import org.apache.druid.common.config.NullHandling;
|
||||||
import org.apache.druid.data.input.impl.JsonInputFormat;
|
import org.apache.druid.data.input.impl.JsonInputFormat;
|
||||||
import org.apache.druid.data.input.impl.LocalInputSource;
|
import org.apache.druid.data.input.impl.LocalInputSource;
|
||||||
import org.apache.druid.data.input.impl.systemfield.SystemFields;
|
import org.apache.druid.data.input.impl.systemfield.SystemFields;
|
||||||
import org.apache.druid.guice.BuiltInTypesModule;
|
import org.apache.druid.guice.BuiltInTypesModule;
|
||||||
import org.apache.druid.java.util.common.Intervals;
|
import org.apache.druid.java.util.common.Intervals;
|
||||||
|
import org.apache.druid.java.util.common.granularity.Granularities;
|
||||||
import org.apache.druid.msq.indexing.MSQSpec;
|
import org.apache.druid.msq.indexing.MSQSpec;
|
||||||
import org.apache.druid.msq.indexing.MSQTuningConfig;
|
import org.apache.druid.msq.indexing.MSQTuningConfig;
|
||||||
import org.apache.druid.msq.indexing.destination.TaskReportMSQDestination;
|
import org.apache.druid.msq.indexing.destination.TaskReportMSQDestination;
|
||||||
|
@ -34,8 +36,20 @@ import org.apache.druid.msq.test.MSQTestBase;
|
||||||
import org.apache.druid.msq.util.MultiStageQueryContext;
|
import org.apache.druid.msq.util.MultiStageQueryContext;
|
||||||
import org.apache.druid.query.DataSource;
|
import org.apache.druid.query.DataSource;
|
||||||
import org.apache.druid.query.NestedDataTestUtils;
|
import org.apache.druid.query.NestedDataTestUtils;
|
||||||
|
import org.apache.druid.query.QueryDataSource;
|
||||||
|
import org.apache.druid.query.aggregation.CountAggregatorFactory;
|
||||||
|
import org.apache.druid.query.aggregation.FilteredAggregatorFactory;
|
||||||
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
|
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
|
||||||
|
import org.apache.druid.query.dimension.DefaultDimensionSpec;
|
||||||
|
import org.apache.druid.query.filter.DimFilter;
|
||||||
|
import org.apache.druid.query.filter.NotDimFilter;
|
||||||
|
import org.apache.druid.query.filter.NullFilter;
|
||||||
|
import org.apache.druid.query.filter.SelectorDimFilter;
|
||||||
|
import org.apache.druid.query.groupby.GroupByQuery;
|
||||||
import org.apache.druid.query.groupby.GroupByQueryConfig;
|
import org.apache.druid.query.groupby.GroupByQueryConfig;
|
||||||
|
import org.apache.druid.query.groupby.orderby.DefaultLimitSpec;
|
||||||
|
import org.apache.druid.query.groupby.orderby.OrderByColumnSpec;
|
||||||
|
import org.apache.druid.query.ordering.StringComparators;
|
||||||
import org.apache.druid.query.scan.ScanQuery;
|
import org.apache.druid.query.scan.ScanQuery;
|
||||||
import org.apache.druid.segment.column.ColumnType;
|
import org.apache.druid.segment.column.ColumnType;
|
||||||
import org.apache.druid.segment.column.RowSignature;
|
import org.apache.druid.segment.column.RowSignature;
|
||||||
|
@ -44,8 +58,10 @@ import org.apache.druid.sql.calcite.external.ExternalDataSource;
|
||||||
import org.apache.druid.sql.calcite.filtration.Filtration;
|
import org.apache.druid.sql.calcite.filtration.Filtration;
|
||||||
import org.apache.druid.sql.calcite.planner.ColumnMapping;
|
import org.apache.druid.sql.calcite.planner.ColumnMapping;
|
||||||
import org.apache.druid.sql.calcite.planner.ColumnMappings;
|
import org.apache.druid.sql.calcite.planner.ColumnMappings;
|
||||||
|
import org.apache.druid.sql.calcite.planner.PlannerConfig;
|
||||||
import org.apache.druid.timeline.SegmentId;
|
import org.apache.druid.timeline.SegmentId;
|
||||||
import org.apache.druid.utils.CompressionUtils;
|
import org.apache.druid.utils.CompressionUtils;
|
||||||
|
import org.junit.jupiter.api.Assumptions;
|
||||||
import org.junit.jupiter.api.BeforeEach;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
import org.junit.jupiter.params.ParameterizedTest;
|
import org.junit.jupiter.params.ParameterizedTest;
|
||||||
import org.junit.jupiter.params.provider.MethodSource;
|
import org.junit.jupiter.params.provider.MethodSource;
|
||||||
|
@ -70,6 +86,7 @@ public class MSQComplexGroupByTest extends MSQTestBase
|
||||||
private String dataFileNameJsonString;
|
private String dataFileNameJsonString;
|
||||||
private String dataFileSignatureJsonString;
|
private String dataFileSignatureJsonString;
|
||||||
private DataSource dataFileExternalDataSource;
|
private DataSource dataFileExternalDataSource;
|
||||||
|
private File dataFile;
|
||||||
|
|
||||||
public static Collection<Object[]> data()
|
public static Collection<Object[]> data()
|
||||||
{
|
{
|
||||||
|
@ -85,9 +102,9 @@ public class MSQComplexGroupByTest extends MSQTestBase
|
||||||
@BeforeEach
|
@BeforeEach
|
||||||
public void setup() throws IOException
|
public void setup() throws IOException
|
||||||
{
|
{
|
||||||
File dataFile = newTempFile("dataFile");
|
dataFile = newTempFile("dataFile");
|
||||||
final InputStream resourceStream = this.getClass().getClassLoader()
|
final InputStream resourceStream = this.getClass().getClassLoader()
|
||||||
.getResourceAsStream(NestedDataTestUtils.ALL_TYPES_TEST_DATA_FILE);
|
.getResourceAsStream(NestedDataTestUtils.ALL_TYPES_TEST_DATA_FILE);
|
||||||
final InputStream decompressing = CompressionUtils.decompress(
|
final InputStream decompressing = CompressionUtils.decompress(
|
||||||
resourceStream,
|
resourceStream,
|
||||||
"nested-all-types-test-data.json"
|
"nested-all-types-test-data.json"
|
||||||
|
@ -416,4 +433,185 @@ public class MSQComplexGroupByTest extends MSQTestBase
|
||||||
))
|
))
|
||||||
.verifyResults();
|
.verifyResults();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@MethodSource("data")
|
||||||
|
@ParameterizedTest(name = "{index}:with context {0}")
|
||||||
|
public void testExactCountDistinctOnNestedData(String contextName, Map<String, Object> context)
|
||||||
|
{
|
||||||
|
Assumptions.assumeTrue(NullHandling.sqlCompatible());
|
||||||
|
RowSignature rowSignature = RowSignature.builder()
|
||||||
|
.add("distinct_obj", ColumnType.LONG)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
Map<String, Object> modifiedContext = ImmutableMap.<String, Object>builder()
|
||||||
|
.putAll(context)
|
||||||
|
.put(PlannerConfig.CTX_KEY_USE_APPROXIMATE_COUNT_DISTINCT, false)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
DimFilter innerFilter = NullHandling.replaceWithDefault()
|
||||||
|
? new SelectorDimFilter("d0", null, null)
|
||||||
|
: new NullFilter("d0", null);
|
||||||
|
|
||||||
|
testSelectQuery().setSql("SELECT\n"
|
||||||
|
+ " COUNT(DISTINCT obj) AS distinct_obj\n"
|
||||||
|
+ "FROM TABLE(\n"
|
||||||
|
+ " EXTERN(\n"
|
||||||
|
+ " '{ \"files\": [" + dataFileNameJsonString + "],\"type\":\"local\"}',\n"
|
||||||
|
+ " '{\"type\": \"json\"}',\n"
|
||||||
|
+ " '[{\"name\": \"timestamp\", \"type\": \"STRING\"}, {\"name\": \"obj\", \"type\": \"COMPLEX<json>\"}]'\n"
|
||||||
|
+ " )\n"
|
||||||
|
+ " )\n"
|
||||||
|
+ " ORDER BY 1")
|
||||||
|
.setQueryContext(ImmutableMap.of(PlannerConfig.CTX_KEY_USE_APPROXIMATE_COUNT_DISTINCT, false))
|
||||||
|
.setExpectedMSQSpec(
|
||||||
|
MSQSpec
|
||||||
|
.builder()
|
||||||
|
.query(
|
||||||
|
GroupByQuery
|
||||||
|
.builder()
|
||||||
|
.setDataSource(
|
||||||
|
new QueryDataSource(
|
||||||
|
GroupByQuery
|
||||||
|
.builder()
|
||||||
|
.setDataSource(dataFileExternalDataSource)
|
||||||
|
.setQuerySegmentSpec(querySegmentSpec(Intervals.ETERNITY))
|
||||||
|
.setDimensions(
|
||||||
|
new DefaultDimensionSpec("obj", "d0", ColumnType.NESTED_DATA)
|
||||||
|
)
|
||||||
|
.setGranularity(Granularities.ALL)
|
||||||
|
.setContext(modifiedContext)
|
||||||
|
.build()
|
||||||
|
)
|
||||||
|
)
|
||||||
|
.setAggregatorSpecs(
|
||||||
|
new FilteredAggregatorFactory(
|
||||||
|
new CountAggregatorFactory("a0"),
|
||||||
|
new NotDimFilter(innerFilter),
|
||||||
|
"a0"
|
||||||
|
)
|
||||||
|
)
|
||||||
|
.setQuerySegmentSpec(querySegmentSpec(Intervals.ETERNITY))
|
||||||
|
.setGranularity(Granularities.ALL)
|
||||||
|
.setLimitSpec(new DefaultLimitSpec(
|
||||||
|
ImmutableList.of(
|
||||||
|
new OrderByColumnSpec(
|
||||||
|
"a0",
|
||||||
|
OrderByColumnSpec.Direction.ASCENDING,
|
||||||
|
StringComparators.NUMERIC
|
||||||
|
)
|
||||||
|
),
|
||||||
|
Integer.MAX_VALUE
|
||||||
|
))
|
||||||
|
.setContext(modifiedContext)
|
||||||
|
.build()
|
||||||
|
)
|
||||||
|
.columnMappings(new ColumnMappings(ImmutableList.of(
|
||||||
|
new ColumnMapping("a0", "distinct_obj")
|
||||||
|
)))
|
||||||
|
.tuningConfig(MSQTuningConfig.defaultConfig())
|
||||||
|
.destination(TaskReportMSQDestination.INSTANCE)
|
||||||
|
.build()
|
||||||
|
)
|
||||||
|
.setExpectedRowSignature(rowSignature)
|
||||||
|
.setQueryContext(modifiedContext)
|
||||||
|
.setExpectedResultRows(ImmutableList.of(
|
||||||
|
new Object[]{7L}
|
||||||
|
))
|
||||||
|
.verifyResults();
|
||||||
|
}
|
||||||
|
|
||||||
|
@MethodSource("data")
|
||||||
|
@ParameterizedTest(name = "{index}:with context {0}")
|
||||||
|
public void testExactCountDistinctOnNestedData2(String contextName, Map<String, Object> context)
|
||||||
|
{
|
||||||
|
Assumptions.assumeTrue(NullHandling.sqlCompatible());
|
||||||
|
RowSignature dataFileSignature = RowSignature.builder()
|
||||||
|
.add("timestamp", ColumnType.STRING)
|
||||||
|
.add("cObj", ColumnType.NESTED_DATA)
|
||||||
|
.build();
|
||||||
|
DataSource dataFileExternalDataSource2 = new ExternalDataSource(
|
||||||
|
new LocalInputSource(null, null, ImmutableList.of(dataFile), SystemFields.none()),
|
||||||
|
new JsonInputFormat(null, null, null, null, null),
|
||||||
|
dataFileSignature
|
||||||
|
);
|
||||||
|
RowSignature rowSignature = RowSignature.builder()
|
||||||
|
.add("distinct_obj", ColumnType.LONG)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
Map<String, Object> modifiedContext = ImmutableMap.<String, Object>builder()
|
||||||
|
.putAll(context)
|
||||||
|
.put(PlannerConfig.CTX_KEY_USE_APPROXIMATE_COUNT_DISTINCT, false)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
DimFilter innerFilter = NullHandling.replaceWithDefault()
|
||||||
|
? new SelectorDimFilter("d0", null, null)
|
||||||
|
: new NullFilter("d0", null);
|
||||||
|
|
||||||
|
testSelectQuery().setSql("SELECT\n"
|
||||||
|
+ " COUNT(DISTINCT cObj) AS distinct_obj\n"
|
||||||
|
+ "FROM TABLE(\n"
|
||||||
|
+ " EXTERN(\n"
|
||||||
|
+ " '{ \"files\": [" + dataFileNameJsonString + "],\"type\":\"local\"}',\n"
|
||||||
|
+ " '{\"type\": \"json\"}',\n"
|
||||||
|
+ " '[{\"name\": \"timestamp\", \"type\": \"STRING\"}, {\"name\": \"cObj\", \"type\": \"COMPLEX<json>\"}]'\n"
|
||||||
|
+ " )\n"
|
||||||
|
+ " )\n"
|
||||||
|
+ " ORDER BY 1")
|
||||||
|
.setQueryContext(ImmutableMap.of(PlannerConfig.CTX_KEY_USE_APPROXIMATE_COUNT_DISTINCT, false))
|
||||||
|
.setExpectedMSQSpec(
|
||||||
|
MSQSpec
|
||||||
|
.builder()
|
||||||
|
.query(
|
||||||
|
GroupByQuery
|
||||||
|
.builder()
|
||||||
|
.setDataSource(
|
||||||
|
new QueryDataSource(
|
||||||
|
GroupByQuery
|
||||||
|
.builder()
|
||||||
|
.setDataSource(dataFileExternalDataSource2)
|
||||||
|
.setQuerySegmentSpec(querySegmentSpec(Intervals.ETERNITY))
|
||||||
|
.setDimensions(
|
||||||
|
new DefaultDimensionSpec("cObj", "d0", ColumnType.NESTED_DATA)
|
||||||
|
)
|
||||||
|
.setGranularity(Granularities.ALL)
|
||||||
|
.setContext(modifiedContext)
|
||||||
|
.build()
|
||||||
|
)
|
||||||
|
)
|
||||||
|
.setAggregatorSpecs(
|
||||||
|
new FilteredAggregatorFactory(
|
||||||
|
new CountAggregatorFactory("a0"),
|
||||||
|
new NotDimFilter(innerFilter),
|
||||||
|
"a0"
|
||||||
|
)
|
||||||
|
)
|
||||||
|
.setQuerySegmentSpec(querySegmentSpec(Intervals.ETERNITY))
|
||||||
|
.setGranularity(Granularities.ALL)
|
||||||
|
.setLimitSpec(new DefaultLimitSpec(
|
||||||
|
ImmutableList.of(
|
||||||
|
new OrderByColumnSpec(
|
||||||
|
"a0",
|
||||||
|
OrderByColumnSpec.Direction.ASCENDING,
|
||||||
|
StringComparators.NUMERIC
|
||||||
|
)
|
||||||
|
),
|
||||||
|
Integer.MAX_VALUE
|
||||||
|
))
|
||||||
|
.setContext(modifiedContext)
|
||||||
|
.build()
|
||||||
|
)
|
||||||
|
.columnMappings(new ColumnMappings(ImmutableList.of(
|
||||||
|
new ColumnMapping("a0", "distinct_obj")
|
||||||
|
)))
|
||||||
|
.tuningConfig(MSQTuningConfig.defaultConfig())
|
||||||
|
.destination(TaskReportMSQDestination.INSTANCE)
|
||||||
|
.build()
|
||||||
|
)
|
||||||
|
.setExpectedRowSignature(rowSignature)
|
||||||
|
.setQueryContext(modifiedContext)
|
||||||
|
.setExpectedResultRows(ImmutableList.of(
|
||||||
|
new Object[]{1L}
|
||||||
|
))
|
||||||
|
.verifyResults();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -640,6 +640,22 @@ public class WorkerTaskManager implements IndexerTaskCountStatsProvider
|
||||||
return getNumTasksPerDatasource(this.getCompletedTasks().values(), TaskAnnouncement::getTaskDataSource);
|
return getNumTasksPerDatasource(this.getCompletedTasks().values(), TaskAnnouncement::getTaskDataSource);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<String, Long> getWorkerFailedTasks()
|
||||||
|
{
|
||||||
|
return getNumTasksPerDatasource(completedTasks.entrySet().stream()
|
||||||
|
.filter(entry -> entry.getValue().getTaskStatus().isFailure())
|
||||||
|
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)).values(), TaskAnnouncement::getTaskDataSource);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<String, Long> getWorkerSuccessfulTasks()
|
||||||
|
{
|
||||||
|
return getNumTasksPerDatasource(completedTasks.entrySet().stream()
|
||||||
|
.filter(entry -> entry.getValue().getTaskStatus().isSuccess())
|
||||||
|
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)).values(), TaskAnnouncement::getTaskDataSource);
|
||||||
|
}
|
||||||
|
|
||||||
private static class TaskDetails
|
private static class TaskDetails
|
||||||
{
|
{
|
||||||
private final Task task;
|
private final Task task;
|
||||||
|
|
|
@ -455,6 +455,19 @@ public class WorkerTaskManagerTest
|
||||||
return new NoopTask(id, null, dataSource, 100, 0, ImmutableMap.of(Tasks.PRIORITY_KEY, 0));
|
return new NoopTask(id, null, dataSource, 100, 0, ImmutableMap.of(Tasks.PRIORITY_KEY, 0));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private NoopTask createNoopFailingTask(String id, String dataSource)
|
||||||
|
{
|
||||||
|
return new NoopTask(id, null, dataSource, 100, 0, ImmutableMap.of(Tasks.PRIORITY_KEY, 0))
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public TaskStatus runTask(TaskToolbox toolbox) throws Exception
|
||||||
|
{
|
||||||
|
Thread.sleep(getRunTime());
|
||||||
|
return TaskStatus.failure(getId(), "Failed to complete the task");
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Start the {@link #workerTaskManager}, submit a {@link NoopTask}, wait for it to be complete. Common preamble
|
* Start the {@link #workerTaskManager}, submit a {@link NoopTask}, wait for it to be complete. Common preamble
|
||||||
* for various tests of {@link WorkerTaskManager#doCompletedTasksCleanup()}.
|
* for various tests of {@link WorkerTaskManager#doCompletedTasksCleanup()}.
|
||||||
|
@ -494,7 +507,7 @@ public class WorkerTaskManagerTest
|
||||||
|
|
||||||
Task task1 = createNoopTask("task1", "wikipedia");
|
Task task1 = createNoopTask("task1", "wikipedia");
|
||||||
Task task2 = createNoopTask("task2", "wikipedia");
|
Task task2 = createNoopTask("task2", "wikipedia");
|
||||||
Task task3 = createNoopTask("task3", "animals");
|
Task task3 = createNoopFailingTask("task3", "animals");
|
||||||
|
|
||||||
workerTaskManager.start();
|
workerTaskManager.start();
|
||||||
// befor assigning tasks we should get no running tasks
|
// befor assigning tasks we should get no running tasks
|
||||||
|
@ -517,11 +530,19 @@ public class WorkerTaskManagerTest
|
||||||
Thread.sleep(10);
|
Thread.sleep(10);
|
||||||
} while (!runningTasks.isEmpty());
|
} while (!runningTasks.isEmpty());
|
||||||
|
|
||||||
// When running tasks are empty all task should be reported as completed
|
// When running tasks are empty all task should be reported as completed and
|
||||||
|
// one of the task for animals datasource should fail and other 2 tasks in
|
||||||
|
// the wikipedia datasource should succeed
|
||||||
Assert.assertEquals(workerTaskManager.getWorkerCompletedTasks(), ImmutableMap.of(
|
Assert.assertEquals(workerTaskManager.getWorkerCompletedTasks(), ImmutableMap.of(
|
||||||
"wikipedia", 2L,
|
"wikipedia", 2L,
|
||||||
"animals", 1L
|
"animals", 1L
|
||||||
));
|
));
|
||||||
|
Assert.assertEquals(workerTaskManager.getWorkerFailedTasks(), ImmutableMap.of(
|
||||||
|
"animals", 1L
|
||||||
|
));
|
||||||
|
Assert.assertEquals(workerTaskManager.getWorkerSuccessfulTasks(), ImmutableMap.of(
|
||||||
|
"wikipedia", 2L
|
||||||
|
));
|
||||||
Assert.assertEquals(workerTaskManager.getWorkerAssignedTasks().size(), 0L);
|
Assert.assertEquals(workerTaskManager.getWorkerAssignedTasks().size(), 0L);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -150,12 +150,16 @@ public class ComplexFieldReader implements FieldReader
|
||||||
private final Memory memory;
|
private final Memory memory;
|
||||||
private final ReadableFieldPointer fieldPointer;
|
private final ReadableFieldPointer fieldPointer;
|
||||||
private final ComplexMetricSerde serde;
|
private final ComplexMetricSerde serde;
|
||||||
|
@SuppressWarnings("rawtypes")
|
||||||
|
private final Class clazz;
|
||||||
|
|
||||||
private Selector(Memory memory, ReadableFieldPointer fieldPointer, ComplexMetricSerde serde)
|
private Selector(Memory memory, ReadableFieldPointer fieldPointer, ComplexMetricSerde serde)
|
||||||
{
|
{
|
||||||
this.memory = memory;
|
this.memory = memory;
|
||||||
this.fieldPointer = fieldPointer;
|
this.fieldPointer = fieldPointer;
|
||||||
this.serde = serde;
|
this.serde = serde;
|
||||||
|
//noinspection deprecation
|
||||||
|
this.clazz = serde.getObjectStrategy().getClazz();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Nullable
|
@Nullable
|
||||||
|
@ -169,7 +173,8 @@ public class ComplexFieldReader implements FieldReader
|
||||||
@Override
|
@Override
|
||||||
public Class<T> classOfObject()
|
public Class<T> classOfObject()
|
||||||
{
|
{
|
||||||
return serde.getExtractor().extractedClass();
|
//noinspection unchecked
|
||||||
|
return clazz;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -124,6 +124,7 @@ public class ComplexFrameColumnReader implements FrameColumnReader
|
||||||
{
|
{
|
||||||
private final Frame frame;
|
private final Frame frame;
|
||||||
private final ComplexMetricSerde serde;
|
private final ComplexMetricSerde serde;
|
||||||
|
private final Class<?> clazz;
|
||||||
private final Memory memory;
|
private final Memory memory;
|
||||||
private final long startOfOffsetSection;
|
private final long startOfOffsetSection;
|
||||||
private final long startOfDataSection;
|
private final long startOfDataSection;
|
||||||
|
@ -138,6 +139,8 @@ public class ComplexFrameColumnReader implements FrameColumnReader
|
||||||
{
|
{
|
||||||
this.frame = frame;
|
this.frame = frame;
|
||||||
this.serde = serde;
|
this.serde = serde;
|
||||||
|
//noinspection deprecation
|
||||||
|
this.clazz = serde.getObjectStrategy().getClazz();
|
||||||
this.memory = memory;
|
this.memory = memory;
|
||||||
this.startOfOffsetSection = startOfOffsetSection;
|
this.startOfOffsetSection = startOfOffsetSection;
|
||||||
this.startOfDataSection = startOfDataSection;
|
this.startOfDataSection = startOfDataSection;
|
||||||
|
@ -158,7 +161,7 @@ public class ComplexFrameColumnReader implements FrameColumnReader
|
||||||
@Override
|
@Override
|
||||||
public Class<?> classOfObject()
|
public Class<?> classOfObject()
|
||||||
{
|
{
|
||||||
return serde.getExtractor().extractedClass();
|
return clazz;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -32,6 +32,7 @@ public class DruidMetrics
|
||||||
public static final String TYPE = "type";
|
public static final String TYPE = "type";
|
||||||
public static final String INTERVAL = "interval";
|
public static final String INTERVAL = "interval";
|
||||||
public static final String ID = "id";
|
public static final String ID = "id";
|
||||||
|
public static final String SUBQUERY_ID = "subQueryId";
|
||||||
public static final String TASK_ID = "taskId";
|
public static final String TASK_ID = "taskId";
|
||||||
public static final String GROUP_ID = "groupId";
|
public static final String GROUP_ID = "groupId";
|
||||||
public static final String STATUS = "status";
|
public static final String STATUS = "status";
|
||||||
|
|
|
@ -37,7 +37,9 @@ import org.apache.druid.java.util.common.guava.Sequence;
|
||||||
import org.apache.druid.java.util.common.guava.Sequences;
|
import org.apache.druid.java.util.common.guava.Sequences;
|
||||||
import org.apache.druid.java.util.common.logger.Logger;
|
import org.apache.druid.java.util.common.logger.Logger;
|
||||||
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
|
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
|
||||||
|
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
|
||||||
import org.apache.druid.query.DataSource;
|
import org.apache.druid.query.DataSource;
|
||||||
|
import org.apache.druid.query.DruidMetrics;
|
||||||
import org.apache.druid.query.FluentQueryRunner;
|
import org.apache.druid.query.FluentQueryRunner;
|
||||||
import org.apache.druid.query.FrameBasedInlineDataSource;
|
import org.apache.druid.query.FrameBasedInlineDataSource;
|
||||||
import org.apache.druid.query.FrameSignaturePair;
|
import org.apache.druid.query.FrameSignaturePair;
|
||||||
|
@ -94,6 +96,8 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
|
||||||
|
|
||||||
private static final Logger log = new Logger(ClientQuerySegmentWalker.class);
|
private static final Logger log = new Logger(ClientQuerySegmentWalker.class);
|
||||||
private static final int FRAME_SIZE = 8_000_000;
|
private static final int FRAME_SIZE = 8_000_000;
|
||||||
|
public static final String ROWS_COUNT_METRIC = "subquery/rows";
|
||||||
|
public static final String BYTES_COUNT_METRIC = "subquery/bytes";
|
||||||
|
|
||||||
private final ServiceEmitter emitter;
|
private final ServiceEmitter emitter;
|
||||||
private final QuerySegmentWalker clusterClient;
|
private final QuerySegmentWalker clusterClient;
|
||||||
|
@ -447,7 +451,9 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
|
||||||
maxSubqueryRows,
|
maxSubqueryRows,
|
||||||
maxSubqueryMemory,
|
maxSubqueryMemory,
|
||||||
useNestedForUnknownTypeInSubquery,
|
useNestedForUnknownTypeInSubquery,
|
||||||
subqueryStatsProvider
|
subqueryStatsProvider,
|
||||||
|
!dryRun,
|
||||||
|
emitter
|
||||||
);
|
);
|
||||||
} else {
|
} else {
|
||||||
// Cannot inline subquery. Attempt to inline one level deeper, and then try again.
|
// Cannot inline subquery. Attempt to inline one level deeper, and then try again.
|
||||||
|
@ -553,8 +559,8 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
|
||||||
* It also plumbs parent query's id and sql id in case the subqueries don't have it set by default
|
* It also plumbs parent query's id and sql id in case the subqueries don't have it set by default
|
||||||
*
|
*
|
||||||
* @param rootDataSource Datasource whose subqueries need to be populated
|
* @param rootDataSource Datasource whose subqueries need to be populated
|
||||||
* @param parentQueryId Parent Query's ID, can be null if do not need to update this in the subqueries
|
* @param parentQueryId Parent Query's ID, can be null if it does not need to update this in the subqueries
|
||||||
* @param parentSqlQueryId Parent Query's SQL Query ID, can be null if do not need to update this in the subqueries
|
* @param parentSqlQueryId Parent Query's SQL Query ID, can be null if it does not need to update this in the subqueries
|
||||||
* @return DataSource populated with the subqueries
|
* @return DataSource populated with the subqueries
|
||||||
*/
|
*/
|
||||||
private DataSource generateSubqueryIds(
|
private DataSource generateSubqueryIds(
|
||||||
|
@ -642,6 +648,9 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
*/
|
||||||
|
/**
|
||||||
|
*
|
||||||
* Convert the results of a particular query into a materialized (List-based) InlineDataSource.
|
* Convert the results of a particular query into a materialized (List-based) InlineDataSource.
|
||||||
*
|
*
|
||||||
* @param query the query
|
* @param query the query
|
||||||
|
@ -651,6 +660,13 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
|
||||||
* particular master query
|
* particular master query
|
||||||
* @param limit user-configured limit. If negative, will be treated as {@link Integer#MAX_VALUE}.
|
* @param limit user-configured limit. If negative, will be treated as {@link Integer#MAX_VALUE}.
|
||||||
* If zero, this method will throw an error immediately.
|
* If zero, this method will throw an error immediately.
|
||||||
|
* @param memoryLimit User configured byte limit.
|
||||||
|
* @param useNestedForUnknownTypeInSubquery Uses nested json for unknown types when materializing subquery results
|
||||||
|
* @param subqueryStatsProvider Statistics about the subquery materialization
|
||||||
|
* @param emitMetrics Flag to control if the metrics need to be emitted while materializing. The metrics are omitted
|
||||||
|
* when we are performing a dry run of the query to avoid double reporting the same metric incorrectly
|
||||||
|
* @param emitter Metrics emitter
|
||||||
|
* @return Inlined datasource represented by the provided results
|
||||||
* @throws ResourceLimitExceededException if the limit is exceeded
|
* @throws ResourceLimitExceededException if the limit is exceeded
|
||||||
*/
|
*/
|
||||||
private static <T, QueryType extends Query<T>> DataSource toInlineDataSource(
|
private static <T, QueryType extends Query<T>> DataSource toInlineDataSource(
|
||||||
|
@ -662,8 +678,10 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
|
||||||
final AtomicBoolean cannotMaterializeToFrames,
|
final AtomicBoolean cannotMaterializeToFrames,
|
||||||
final int limit,
|
final int limit,
|
||||||
long memoryLimit,
|
long memoryLimit,
|
||||||
boolean useNestedForUnknownTypeInSubquery,
|
final boolean useNestedForUnknownTypeInSubquery,
|
||||||
SubqueryCountStatsProvider subqueryStatsProvider
|
final SubqueryCountStatsProvider subqueryStatsProvider,
|
||||||
|
final boolean emitMetrics,
|
||||||
|
final ServiceEmitter emitter
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
final int rowLimitToUse = limit < 0 ? Integer.MAX_VALUE : limit;
|
final int rowLimitToUse = limit < 0 ? Integer.MAX_VALUE : limit;
|
||||||
|
@ -683,7 +701,9 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
|
||||||
toolChest,
|
toolChest,
|
||||||
limitAccumulator,
|
limitAccumulator,
|
||||||
limit,
|
limit,
|
||||||
subqueryStatsProvider
|
subqueryStatsProvider,
|
||||||
|
emitMetrics,
|
||||||
|
emitter
|
||||||
);
|
);
|
||||||
break;
|
break;
|
||||||
case MEMORY_LIMIT:
|
case MEMORY_LIMIT:
|
||||||
|
@ -699,7 +719,9 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
|
||||||
memoryLimitAccumulator,
|
memoryLimitAccumulator,
|
||||||
memoryLimit,
|
memoryLimit,
|
||||||
useNestedForUnknownTypeInSubquery,
|
useNestedForUnknownTypeInSubquery,
|
||||||
subqueryStatsProvider
|
subqueryStatsProvider,
|
||||||
|
emitMetrics,
|
||||||
|
emitter
|
||||||
);
|
);
|
||||||
if (!maybeDataSource.isPresent()) {
|
if (!maybeDataSource.isPresent()) {
|
||||||
cannotMaterializeToFrames.set(true);
|
cannotMaterializeToFrames.set(true);
|
||||||
|
@ -716,7 +738,9 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
|
||||||
toolChest,
|
toolChest,
|
||||||
limitAccumulator,
|
limitAccumulator,
|
||||||
limit,
|
limit,
|
||||||
subqueryStatsProvider
|
subqueryStatsProvider,
|
||||||
|
emitMetrics,
|
||||||
|
emitter
|
||||||
);
|
);
|
||||||
} else {
|
} else {
|
||||||
subqueryStatsProvider.incrementSubqueriesWithByteLimit();
|
subqueryStatsProvider.incrementSubqueriesWithByteLimit();
|
||||||
|
@ -739,9 +763,11 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
|
||||||
final QueryToolChest<T, QueryType> toolChest,
|
final QueryToolChest<T, QueryType> toolChest,
|
||||||
final AtomicInteger limitAccumulator,
|
final AtomicInteger limitAccumulator,
|
||||||
final AtomicLong memoryLimitAccumulator,
|
final AtomicLong memoryLimitAccumulator,
|
||||||
long memoryLimit,
|
final long memoryLimit,
|
||||||
boolean useNestedForUnknownTypeInSubquery,
|
final boolean useNestedForUnknownTypeInSubquery,
|
||||||
final SubqueryCountStatsProvider subqueryStatsProvider
|
final SubqueryCountStatsProvider subqueryStatsProvider,
|
||||||
|
final boolean emitMetrics,
|
||||||
|
final ServiceEmitter emitter
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
Optional<Sequence<FrameSignaturePair>> framesOptional;
|
Optional<Sequence<FrameSignaturePair>> framesOptional;
|
||||||
|
@ -764,6 +790,8 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
|
||||||
|
|
||||||
startedAccumulating = true;
|
startedAccumulating = true;
|
||||||
|
|
||||||
|
final int initialSubqueryRows = limitAccumulator.get();
|
||||||
|
final long initialSubqueryBytes = memoryLimitAccumulator.get();
|
||||||
frames.forEach(
|
frames.forEach(
|
||||||
frame -> {
|
frame -> {
|
||||||
limitAccumulator.addAndGet(frame.getFrame().numRows());
|
limitAccumulator.addAndGet(frame.getFrame().numRows());
|
||||||
|
@ -775,6 +803,21 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
|
||||||
frameSignaturePairs.add(frame);
|
frameSignaturePairs.add(frame);
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
if (emitMetrics) {
|
||||||
|
emitter.emit(
|
||||||
|
ServiceMetricEvent.builder()
|
||||||
|
.setDimension(DruidMetrics.ID, query.getId())
|
||||||
|
.setDimension(DruidMetrics.SUBQUERY_ID, query.getSubQueryId())
|
||||||
|
.setMetric(ROWS_COUNT_METRIC, limitAccumulator.get() - initialSubqueryRows)
|
||||||
|
);
|
||||||
|
|
||||||
|
emitter.emit(
|
||||||
|
ServiceMetricEvent.builder()
|
||||||
|
.setDimension(DruidMetrics.ID, query.getId())
|
||||||
|
.setDimension(DruidMetrics.SUBQUERY_ID, query.getSubQueryId())
|
||||||
|
.setMetric(BYTES_COUNT_METRIC, memoryLimitAccumulator.get() - initialSubqueryBytes)
|
||||||
|
);
|
||||||
|
}
|
||||||
return Optional.of(new FrameBasedInlineDataSource(frameSignaturePairs, toolChest.resultArraySignature(query)));
|
return Optional.of(new FrameBasedInlineDataSource(frameSignaturePairs, toolChest.resultArraySignature(query)));
|
||||||
}
|
}
|
||||||
catch (UnsupportedColumnTypeException e) {
|
catch (UnsupportedColumnTypeException e) {
|
||||||
|
@ -811,7 +854,9 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
|
||||||
final QueryToolChest<T, QueryType> toolChest,
|
final QueryToolChest<T, QueryType> toolChest,
|
||||||
final AtomicInteger limitAccumulator,
|
final AtomicInteger limitAccumulator,
|
||||||
final int limit,
|
final int limit,
|
||||||
final SubqueryCountStatsProvider subqueryStatsProvider
|
final SubqueryCountStatsProvider subqueryStatsProvider,
|
||||||
|
boolean emitMetrics,
|
||||||
|
final ServiceEmitter emitter
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
final int rowLimitToUse = limit < 0 ? Integer.MAX_VALUE : limit;
|
final int rowLimitToUse = limit < 0 ? Integer.MAX_VALUE : limit;
|
||||||
|
@ -819,6 +864,7 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
|
||||||
|
|
||||||
final ArrayList<Object[]> resultList = new ArrayList<>();
|
final ArrayList<Object[]> resultList = new ArrayList<>();
|
||||||
|
|
||||||
|
final int initialSubqueryRows = limitAccumulator.get();
|
||||||
toolChest.resultsAsArrays(query, results).accumulate(
|
toolChest.resultsAsArrays(query, results).accumulate(
|
||||||
resultList,
|
resultList,
|
||||||
(acc, in) -> {
|
(acc, in) -> {
|
||||||
|
@ -830,6 +876,14 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
|
||||||
return acc;
|
return acc;
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
if (emitMetrics) {
|
||||||
|
emitter.emit(
|
||||||
|
ServiceMetricEvent.builder()
|
||||||
|
.setDimension(DruidMetrics.ID, query.getId())
|
||||||
|
.setDimension(DruidMetrics.SUBQUERY_ID, query.getSubQueryId())
|
||||||
|
.setMetric(ROWS_COUNT_METRIC, limitAccumulator.get() - initialSubqueryRows)
|
||||||
|
);
|
||||||
|
}
|
||||||
return InlineDataSource.fromIterable(resultList, signature);
|
return InlineDataSource.fromIterable(resultList, signature);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -41,4 +41,8 @@ public interface IndexerTaskCountStatsProvider
|
||||||
* Map from datasource name to the number of completed tasks by the Indexer.
|
* Map from datasource name to the number of completed tasks by the Indexer.
|
||||||
*/
|
*/
|
||||||
Map<String, Long> getWorkerCompletedTasks();
|
Map<String, Long> getWorkerCompletedTasks();
|
||||||
|
|
||||||
|
Map<String, Long> getWorkerFailedTasks();
|
||||||
|
|
||||||
|
Map<String, Long> getWorkerSuccessfulTasks();
|
||||||
}
|
}
|
||||||
|
|
|
@ -72,6 +72,8 @@ public class WorkerTaskCountStatsMonitor extends AbstractMonitor
|
||||||
emit(emitter, "worker/task/running/count", indexerStatsProvider.getWorkerRunningTasks());
|
emit(emitter, "worker/task/running/count", indexerStatsProvider.getWorkerRunningTasks());
|
||||||
emit(emitter, "worker/task/assigned/count", indexerStatsProvider.getWorkerAssignedTasks());
|
emit(emitter, "worker/task/assigned/count", indexerStatsProvider.getWorkerAssignedTasks());
|
||||||
emit(emitter, "worker/task/completed/count", indexerStatsProvider.getWorkerCompletedTasks());
|
emit(emitter, "worker/task/completed/count", indexerStatsProvider.getWorkerCompletedTasks());
|
||||||
|
emit(emitter, "worker/task/failed/count", indexerStatsProvider.getWorkerFailedTasks());
|
||||||
|
emit(emitter, "worker/task/success/count", indexerStatsProvider.getWorkerSuccessfulTasks());
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
|
@ -32,8 +32,12 @@ import org.apache.druid.java.util.common.guava.Sequence;
|
||||||
import org.apache.druid.java.util.common.guava.Sequences;
|
import org.apache.druid.java.util.common.guava.Sequences;
|
||||||
import org.apache.druid.java.util.common.io.Closer;
|
import org.apache.druid.java.util.common.io.Closer;
|
||||||
import org.apache.druid.java.util.common.logger.Logger;
|
import org.apache.druid.java.util.common.logger.Logger;
|
||||||
|
import org.apache.druid.java.util.emitter.core.Event;
|
||||||
|
import org.apache.druid.java.util.emitter.core.EventMap;
|
||||||
|
import org.apache.druid.java.util.metrics.StubServiceEmitter;
|
||||||
import org.apache.druid.math.expr.ExprMacroTable;
|
import org.apache.druid.math.expr.ExprMacroTable;
|
||||||
import org.apache.druid.query.DataSource;
|
import org.apache.druid.query.DataSource;
|
||||||
|
import org.apache.druid.query.DruidMetrics;
|
||||||
import org.apache.druid.query.Druids;
|
import org.apache.druid.query.Druids;
|
||||||
import org.apache.druid.query.FrameBasedInlineDataSource;
|
import org.apache.druid.query.FrameBasedInlineDataSource;
|
||||||
import org.apache.druid.query.GlobalTableDataSource;
|
import org.apache.druid.query.GlobalTableDataSource;
|
||||||
|
@ -214,6 +218,7 @@ public class ClientQuerySegmentWalkerTest
|
||||||
|
|
||||||
private Closer closer;
|
private Closer closer;
|
||||||
private QueryRunnerFactoryConglomerate conglomerate;
|
private QueryRunnerFactoryConglomerate conglomerate;
|
||||||
|
private final StubServiceEmitter emitter = new StubServiceEmitter();
|
||||||
|
|
||||||
// Queries that are issued; checked by "testQuery" against its "expectedQueries" parameter.
|
// Queries that are issued; checked by "testQuery" against its "expectedQueries" parameter.
|
||||||
private final List<ExpectedQuery> issuedQueries = new ArrayList<>();
|
private final List<ExpectedQuery> issuedQueries = new ArrayList<>();
|
||||||
|
@ -228,6 +233,7 @@ public class ClientQuerySegmentWalkerTest
|
||||||
public void setUp()
|
public void setUp()
|
||||||
{
|
{
|
||||||
closer = Closer.create();
|
closer = Closer.create();
|
||||||
|
emitter.flush();
|
||||||
conglomerate = QueryStackTests.createQueryRunnerFactoryConglomerate(closer);
|
conglomerate = QueryStackTests.createQueryRunnerFactoryConglomerate(closer);
|
||||||
scheduler = new ObservableQueryScheduler(
|
scheduler = new ObservableQueryScheduler(
|
||||||
8,
|
8,
|
||||||
|
@ -941,6 +947,113 @@ public class ClientQuerySegmentWalkerTest
|
||||||
testQuery(query, ImmutableList.of(), ImmutableList.of());
|
testQuery(query, ImmutableList.of(), ImmutableList.of());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMetricsWithMaxSubqueryRowsEnabled()
|
||||||
|
{
|
||||||
|
final GroupByQuery subquery =
|
||||||
|
GroupByQuery.builder()
|
||||||
|
.setDataSource(FOO)
|
||||||
|
.setGranularity(Granularities.ALL)
|
||||||
|
.setInterval(Collections.singletonList(INTERVAL))
|
||||||
|
.setDimensions(DefaultDimensionSpec.of("s"))
|
||||||
|
.build();
|
||||||
|
|
||||||
|
final TimeseriesQuery query =
|
||||||
|
(TimeseriesQuery) Druids.newTimeseriesQueryBuilder()
|
||||||
|
.dataSource(new QueryDataSource(subquery))
|
||||||
|
.granularity(Granularities.ALL)
|
||||||
|
.intervals(Intervals.ONLY_ETERNITY)
|
||||||
|
.aggregators(new CountAggregatorFactory("cnt"))
|
||||||
|
.build()
|
||||||
|
.withId(DUMMY_QUERY_ID);
|
||||||
|
|
||||||
|
testQuery(
|
||||||
|
query,
|
||||||
|
new ArrayList<>(ImmutableList.of(
|
||||||
|
ExpectedQuery.cluster(subquery.withId(DUMMY_QUERY_ID).withSubQueryId("1.1")),
|
||||||
|
ExpectedQuery.local(
|
||||||
|
query.withDataSource(
|
||||||
|
InlineDataSource.fromIterable(
|
||||||
|
ImmutableList.of(new Object[]{"x"}, new Object[]{"y"}, new Object[]{"z"}),
|
||||||
|
RowSignature.builder().add("s", ColumnType.STRING).build()
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)),
|
||||||
|
ImmutableList.of(new Object[]{Intervals.ETERNITY.getStartMillis(), 3L})
|
||||||
|
);
|
||||||
|
|
||||||
|
List<Event> events = emitter.getEvents();
|
||||||
|
|
||||||
|
for (Event event : events) {
|
||||||
|
EventMap map = event.toMap();
|
||||||
|
if (ClientQuerySegmentWalker.ROWS_COUNT_METRIC.equals(map.get("metric"))) {
|
||||||
|
Assert.assertTrue(map.containsKey("host"));
|
||||||
|
Assert.assertTrue(map.containsKey("service"));
|
||||||
|
Assert.assertEquals(DUMMY_QUERY_ID, map.get(DruidMetrics.ID));
|
||||||
|
Assert.assertEquals("1.1", map.get(DruidMetrics.SUBQUERY_ID));
|
||||||
|
Assert.assertEquals(3, map.get("value"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMetricsWithMaxSubqueryBytesEnabled()
|
||||||
|
{
|
||||||
|
final GroupByQuery subquery =
|
||||||
|
GroupByQuery.builder()
|
||||||
|
.setDataSource(FOO)
|
||||||
|
.setGranularity(Granularities.ALL)
|
||||||
|
.setInterval(Collections.singletonList(INTERVAL))
|
||||||
|
.setDimensions(DefaultDimensionSpec.of("s"))
|
||||||
|
.build();
|
||||||
|
|
||||||
|
final TimeseriesQuery query =
|
||||||
|
(TimeseriesQuery) Druids.newTimeseriesQueryBuilder()
|
||||||
|
.dataSource(new QueryDataSource(subquery))
|
||||||
|
.granularity(Granularities.ALL)
|
||||||
|
.intervals(Intervals.ONLY_ETERNITY)
|
||||||
|
.aggregators(new CountAggregatorFactory("cnt"))
|
||||||
|
.context(ImmutableMap.of(QueryContexts.MAX_SUBQUERY_BYTES_KEY, "10000"))
|
||||||
|
.build()
|
||||||
|
.withId(DUMMY_QUERY_ID);
|
||||||
|
|
||||||
|
testQuery(
|
||||||
|
query,
|
||||||
|
new ArrayList<>(ImmutableList.of(
|
||||||
|
ExpectedQuery.cluster(subquery.withId(DUMMY_QUERY_ID).withSubQueryId("1.1")),
|
||||||
|
ExpectedQuery.local(
|
||||||
|
query.withDataSource(
|
||||||
|
InlineDataSource.fromIterable(
|
||||||
|
ImmutableList.of(new Object[]{"x"}, new Object[]{"y"}, new Object[]{"z"}),
|
||||||
|
RowSignature.builder().add("s", ColumnType.STRING).build()
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)),
|
||||||
|
ImmutableList.of(new Object[]{Intervals.ETERNITY.getStartMillis(), 3L})
|
||||||
|
);
|
||||||
|
|
||||||
|
List<Event> events = emitter.getEvents();
|
||||||
|
|
||||||
|
for (Event event : events) {
|
||||||
|
EventMap map = event.toMap();
|
||||||
|
if (ClientQuerySegmentWalker.ROWS_COUNT_METRIC.equals(map.get("metric"))) {
|
||||||
|
Assert.assertTrue(map.containsKey("host"));
|
||||||
|
Assert.assertTrue(map.containsKey("service"));
|
||||||
|
Assert.assertEquals(DUMMY_QUERY_ID, map.get(DruidMetrics.ID));
|
||||||
|
Assert.assertEquals("1.1", map.get(DruidMetrics.SUBQUERY_ID));
|
||||||
|
Assert.assertEquals(3, map.get("value"));
|
||||||
|
} else if (ClientQuerySegmentWalker.BYTES_COUNT_METRIC.equals(map.get("metric"))) {
|
||||||
|
Assert.assertTrue(map.containsKey("host"));
|
||||||
|
Assert.assertTrue(map.containsKey("service"));
|
||||||
|
Assert.assertEquals(DUMMY_QUERY_ID, map.get(DruidMetrics.ID));
|
||||||
|
Assert.assertEquals("1.1", map.get(DruidMetrics.SUBQUERY_ID));
|
||||||
|
Assert.assertEquals(43L, map.get("value"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testGroupByOnArraysDoubles()
|
public void testGroupByOnArraysDoubles()
|
||||||
{
|
{
|
||||||
|
@ -1545,13 +1658,15 @@ public class ClientQuerySegmentWalkerTest
|
||||||
conglomerate,
|
conglomerate,
|
||||||
segmentWrangler,
|
segmentWrangler,
|
||||||
joinableFactoryWrapper,
|
joinableFactoryWrapper,
|
||||||
schedulerForTest
|
schedulerForTest,
|
||||||
),
|
emitter
|
||||||
|
),
|
||||||
ClusterOrLocal.LOCAL
|
ClusterOrLocal.LOCAL
|
||||||
),
|
),
|
||||||
conglomerate,
|
conglomerate,
|
||||||
joinableFactory,
|
joinableFactory,
|
||||||
serverConfig
|
serverConfig,
|
||||||
|
emitter
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -87,7 +87,6 @@ import org.apache.druid.segment.join.JoinableFactoryWrapper;
|
||||||
import org.apache.druid.segment.join.LookupJoinableFactory;
|
import org.apache.druid.segment.join.LookupJoinableFactory;
|
||||||
import org.apache.druid.segment.join.MapJoinableFactory;
|
import org.apache.druid.segment.join.MapJoinableFactory;
|
||||||
import org.apache.druid.server.initialization.ServerConfig;
|
import org.apache.druid.server.initialization.ServerConfig;
|
||||||
import org.apache.druid.server.metrics.NoopServiceEmitter;
|
|
||||||
import org.apache.druid.server.metrics.SubqueryCountStatsProvider;
|
import org.apache.druid.server.metrics.SubqueryCountStatsProvider;
|
||||||
import org.apache.druid.server.scheduling.ManualQueryPrioritizationStrategy;
|
import org.apache.druid.server.scheduling.ManualQueryPrioritizationStrategy;
|
||||||
import org.apache.druid.server.scheduling.NoQueryLaningStrategy;
|
import org.apache.druid.server.scheduling.NoQueryLaningStrategy;
|
||||||
|
@ -117,7 +116,6 @@ public class QueryStackTests
|
||||||
|
|
||||||
public static final int DEFAULT_NUM_MERGE_BUFFERS = -1;
|
public static final int DEFAULT_NUM_MERGE_BUFFERS = -1;
|
||||||
|
|
||||||
private static final ServiceEmitter EMITTER = new NoopServiceEmitter();
|
|
||||||
private static final int COMPUTE_BUFFER_SIZE = 10 * 1024 * 1024;
|
private static final int COMPUTE_BUFFER_SIZE = 10 * 1024 * 1024;
|
||||||
|
|
||||||
private QueryStackTests()
|
private QueryStackTests()
|
||||||
|
@ -131,11 +129,12 @@ public class QueryStackTests
|
||||||
final QuerySegmentWalker localWalker,
|
final QuerySegmentWalker localWalker,
|
||||||
final QueryRunnerFactoryConglomerate conglomerate,
|
final QueryRunnerFactoryConglomerate conglomerate,
|
||||||
final JoinableFactory joinableFactory,
|
final JoinableFactory joinableFactory,
|
||||||
final ServerConfig serverConfig
|
final ServerConfig serverConfig,
|
||||||
|
final ServiceEmitter emitter
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
return new ClientQuerySegmentWalker(
|
return new ClientQuerySegmentWalker(
|
||||||
EMITTER,
|
emitter,
|
||||||
clusterWalker,
|
clusterWalker,
|
||||||
localWalker,
|
localWalker,
|
||||||
new QueryToolChestWarehouse()
|
new QueryToolChestWarehouse()
|
||||||
|
@ -172,7 +171,8 @@ public class QueryStackTests
|
||||||
final QueryRunnerFactoryConglomerate conglomerate,
|
final QueryRunnerFactoryConglomerate conglomerate,
|
||||||
final SegmentWrangler segmentWrangler,
|
final SegmentWrangler segmentWrangler,
|
||||||
final JoinableFactoryWrapper joinableFactoryWrapper,
|
final JoinableFactoryWrapper joinableFactoryWrapper,
|
||||||
final QueryScheduler scheduler
|
final QueryScheduler scheduler,
|
||||||
|
final ServiceEmitter emitter
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
return new LocalQuerySegmentWalker(
|
return new LocalQuerySegmentWalker(
|
||||||
|
@ -180,7 +180,7 @@ public class QueryStackTests
|
||||||
segmentWrangler,
|
segmentWrangler,
|
||||||
joinableFactoryWrapper,
|
joinableFactoryWrapper,
|
||||||
scheduler,
|
scheduler,
|
||||||
EMITTER
|
emitter
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -49,6 +49,7 @@ import org.apache.druid.segment.incremental.IncrementalIndex;
|
||||||
import org.apache.druid.segment.join.JoinableFactory;
|
import org.apache.druid.segment.join.JoinableFactory;
|
||||||
import org.apache.druid.segment.join.JoinableFactoryWrapper;
|
import org.apache.druid.segment.join.JoinableFactoryWrapper;
|
||||||
import org.apache.druid.server.initialization.ServerConfig;
|
import org.apache.druid.server.initialization.ServerConfig;
|
||||||
|
import org.apache.druid.server.metrics.NoopServiceEmitter;
|
||||||
import org.apache.druid.timeline.DataSegment;
|
import org.apache.druid.timeline.DataSegment;
|
||||||
import org.apache.druid.timeline.VersionedIntervalTimeline;
|
import org.apache.druid.timeline.VersionedIntervalTimeline;
|
||||||
import org.joda.time.Interval;
|
import org.joda.time.Interval;
|
||||||
|
@ -133,11 +134,13 @@ public class SpecificSegmentsQuerySegmentWalker implements QuerySegmentWalker, C
|
||||||
conglomerate,
|
conglomerate,
|
||||||
segmentWrangler,
|
segmentWrangler,
|
||||||
joinableFactoryWrapper,
|
joinableFactoryWrapper,
|
||||||
scheduler
|
scheduler,
|
||||||
|
new NoopServiceEmitter()
|
||||||
),
|
),
|
||||||
conglomerate,
|
conglomerate,
|
||||||
joinableFactoryWrapper.getJoinableFactory(),
|
joinableFactoryWrapper.getJoinableFactory(),
|
||||||
new ServerConfig()
|
new ServerConfig(),
|
||||||
|
new NoopServiceEmitter()
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
|
@ -120,6 +120,24 @@ public class WorkerTaskCountStatsMonitorTest
|
||||||
"metrics", 9L
|
"metrics", 9L
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<String, Long> getWorkerFailedTasks()
|
||||||
|
{
|
||||||
|
return ImmutableMap.of(
|
||||||
|
"movies", 4L,
|
||||||
|
"games", 6L
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<String, Long> getWorkerSuccessfulTasks()
|
||||||
|
{
|
||||||
|
return ImmutableMap.of(
|
||||||
|
"games", 23L,
|
||||||
|
"inventory", 89L
|
||||||
|
);
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
nullStatsProvider = new WorkerTaskCountStatsProvider()
|
nullStatsProvider = new WorkerTaskCountStatsProvider()
|
||||||
|
@ -239,7 +257,7 @@ public class WorkerTaskCountStatsMonitorTest
|
||||||
new WorkerTaskCountStatsMonitor(injectorForIndexer, ImmutableSet.of(NodeRole.INDEXER));
|
new WorkerTaskCountStatsMonitor(injectorForIndexer, ImmutableSet.of(NodeRole.INDEXER));
|
||||||
final StubServiceEmitter emitter = new StubServiceEmitter("service", "host");
|
final StubServiceEmitter emitter = new StubServiceEmitter("service", "host");
|
||||||
monitor.doMonitor(emitter);
|
monitor.doMonitor(emitter);
|
||||||
Assert.assertEquals(6, emitter.getEvents().size());
|
Assert.assertEquals(10, emitter.getEvents().size());
|
||||||
emitter.verifyValue(
|
emitter.verifyValue(
|
||||||
"worker/task/running/count",
|
"worker/task/running/count",
|
||||||
ImmutableMap.of("dataSource", "wikipedia"),
|
ImmutableMap.of("dataSource", "wikipedia"),
|
||||||
|
@ -270,6 +288,26 @@ public class WorkerTaskCountStatsMonitorTest
|
||||||
ImmutableMap.of("dataSource", "metrics"),
|
ImmutableMap.of("dataSource", "metrics"),
|
||||||
9L
|
9L
|
||||||
);
|
);
|
||||||
|
emitter.verifyValue(
|
||||||
|
"worker/task/failed/count",
|
||||||
|
ImmutableMap.of("dataSource", "movies"),
|
||||||
|
4L
|
||||||
|
);
|
||||||
|
emitter.verifyValue(
|
||||||
|
"worker/task/failed/count",
|
||||||
|
ImmutableMap.of("dataSource", "games"),
|
||||||
|
6L
|
||||||
|
);
|
||||||
|
emitter.verifyValue(
|
||||||
|
"worker/task/success/count",
|
||||||
|
ImmutableMap.of("dataSource", "games"),
|
||||||
|
23L
|
||||||
|
);
|
||||||
|
emitter.verifyValue(
|
||||||
|
"worker/task/success/count",
|
||||||
|
ImmutableMap.of("dataSource", "inventory"),
|
||||||
|
89L
|
||||||
|
);
|
||||||
}
|
}
|
||||||
@Test
|
@Test
|
||||||
public void testMonitorWithNulls()
|
public void testMonitorWithNulls()
|
||||||
|
|
Loading…
Reference in New Issue