Add metrics to the native queries underpinning SQL. (#4561)

* Add metrics to the native queries underpinning SQL.

This is done by factoring out the metrics and request log emitting
code from QueryResource into a new QueryLifecycle class. That class
is used by both QueryResource and the SQL DruidSchema and QueryMaker.

Also fixes a couple of bugs in QueryResource:

- RequestLogLine start time was set to `TimeUnit.NANOSECONDS.toMillis(startNs)`,
  which is incorrect since absolute nanos cannot be converted to millis.
- DruidMetrics.makeRequestMetrics was called with null `query` on
  unparseable queries, which led to spurious "Unable to log query"
  errors.

Partial fix for #4047.

* Code style

* Remove unused imports.

* Fix tests.

* Remove unused import.
This commit is contained in:
Gian Merlino 2017-07-24 21:26:27 -07:00 committed by Jonathan Wei
parent 8a4185897e
commit 5048ab3e96
20 changed files with 714 additions and 414 deletions

View File

@ -38,7 +38,6 @@ import io.druid.query.dimension.DefaultDimensionSpec;
import io.druid.query.dimension.DimensionSpec;
import io.druid.query.groupby.GroupByQuery;
import io.druid.segment.QueryableIndex;
import io.druid.server.initialization.ServerConfig;
import io.druid.sql.calcite.planner.DruidPlanner;
import io.druid.sql.calcite.planner.PlannerConfig;
import io.druid.sql.calcite.planner.PlannerFactory;
@ -116,11 +115,10 @@ public class SqlBenchmark
plannerFactory = new PlannerFactory(
CalciteTests.createMockSchema(walker, plannerConfig),
walker,
CalciteTests.createMockQueryLifecycleFactory(walker),
CalciteTests.createOperatorTable(),
CalciteTests.createExprMacroTable(),
plannerConfig,
new ServerConfig()
plannerConfig
);
groupByQuery = GroupByQuery
.builder()

View File

@ -26,7 +26,6 @@ import com.google.common.collect.Iterables;
import io.druid.java.util.common.granularity.Granularities;
import io.druid.java.util.common.guava.Sequences;
import io.druid.query.Druids;
import io.druid.query.QueryContexts;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.query.aggregation.FilteredAggregatorFactory;
@ -45,7 +44,6 @@ import io.druid.segment.TestHelper;
import io.druid.segment.column.ValueType;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.virtual.ExpressionVirtualColumn;
import io.druid.server.initialization.ServerConfig;
import io.druid.sql.calcite.filtration.Filtration;
import io.druid.sql.calcite.planner.Calcites;
import io.druid.sql.calcite.planner.DruidOperatorTable;
@ -131,11 +129,10 @@ public class QuantileSqlAggregatorTest
);
plannerFactory = new PlannerFactory(
druidSchema,
walker,
CalciteTests.createMockQueryLifecycleFactory(walker),
operatorTable,
CalciteTests.createExprMacroTable(),
plannerConfig,
new ServerConfig()
plannerConfig
);
}
@ -223,11 +220,7 @@ public class QuantileSqlAggregatorTest
new QuantilePostAggregator("a7", "a5:agg", 0.999f),
new QuantilePostAggregator("a8", "a8:agg", 0.50f)
))
.context(ImmutableMap.<String, Object>of(
"skipEmptyBuckets", true,
QueryContexts.DEFAULT_TIMEOUT_KEY, QueryContexts.DEFAULT_TIMEOUT_MILLIS,
QueryContexts.MAX_SCATTER_GATHER_BYTES_KEY, Long.MAX_VALUE
))
.context(ImmutableMap.<String, Object>of("skipEmptyBuckets", true))
.build(),
Iterables.getOnlyElement(queryLogHook.getRecordedQueries())
);
@ -287,11 +280,7 @@ public class QuantileSqlAggregatorTest
new QuantilePostAggregator("a5", "a5:agg", 0.999f),
new QuantilePostAggregator("a6", "a4:agg", 0.999f)
))
.context(ImmutableMap.<String, Object>of(
"skipEmptyBuckets", true,
QueryContexts.DEFAULT_TIMEOUT_KEY, QueryContexts.DEFAULT_TIMEOUT_MILLIS,
QueryContexts.MAX_SCATTER_GATHER_BYTES_KEY, Long.MAX_VALUE
))
.context(ImmutableMap.<String, Object>of("skipEmptyBuckets", true))
.build(),
Iterables.getOnlyElement(queryLogHook.getRecordedQueries())
);

View File

@ -128,6 +128,15 @@ public class DirectDruidClient<T> implements QueryRunner<T>
);
}
/**
* Removes the magical fields added by {@link #makeResponseContextForQuery(Query, long)}.
*/
public static void removeMagicResponseContextFields(Map<String, Object> responseContext)
{
responseContext.remove(DirectDruidClient.QUERY_FAIL_TIME);
responseContext.remove(DirectDruidClient.QUERY_TOTAL_BYTES_GATHERED);
}
public static Map<String, Object> makeResponseContextForQuery(Query query, long startTimeMillis)
{
final Map<String, Object> responseContext = new MapMaker().makeMap();

View File

@ -22,19 +22,13 @@ package io.druid.server;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
import com.google.inject.Inject;
import com.metamx.emitter.service.ServiceEmitter;
import com.sun.jersey.spi.container.ResourceFilters;
import io.druid.client.ServerViewUtil;
import io.druid.client.TimelineServerView;
import io.druid.guice.annotations.Json;
import io.druid.guice.annotations.Smile;
import io.druid.query.Query;
import io.druid.query.GenericQueryMetricsFactory;
import io.druid.query.QuerySegmentWalker;
import io.druid.query.QueryToolChestWarehouse;
import io.druid.server.http.security.StateResourceFilter;
import io.druid.server.initialization.ServerConfig;
import io.druid.server.log.RequestLogger;
import io.druid.server.security.AuthConfig;
import javax.servlet.http.HttpServletRequest;
@ -59,30 +53,20 @@ public class BrokerQueryResource extends QueryResource
@Inject
public BrokerQueryResource(
QueryToolChestWarehouse warehouse,
ServerConfig config,
QueryLifecycleFactory queryLifecycleFactory,
@Json ObjectMapper jsonMapper,
@Smile ObjectMapper smileMapper,
QuerySegmentWalker texasRanger,
ServiceEmitter emitter,
RequestLogger requestLogger,
QueryManager queryManager,
AuthConfig authConfig,
GenericQueryMetricsFactory queryMetricsFactory,
TimelineServerView brokerServerView
)
{
super(
warehouse,
config,
queryLifecycleFactory,
jsonMapper,
smileMapper,
texasRanger,
emitter,
requestLogger,
queryManager,
authConfig,
queryMetricsFactory
authConfig
);
this.brokerServerView = brokerServerView;
}

View File

@ -0,0 +1,363 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.server;
import com.google.common.base.Strings;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.client.DirectDruidClient;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.SequenceWrapper;
import io.druid.java.util.common.guava.Sequences;
import io.druid.java.util.common.logger.Logger;
import io.druid.query.DruidMetrics;
import io.druid.query.GenericQueryMetricsFactory;
import io.druid.query.Query;
import io.druid.query.QueryInterruptedException;
import io.druid.query.QueryMetrics;
import io.druid.query.QueryPlus;
import io.druid.query.QuerySegmentWalker;
import io.druid.query.QueryToolChest;
import io.druid.query.QueryToolChestWarehouse;
import io.druid.server.initialization.ServerConfig;
import io.druid.server.log.RequestLogger;
import io.druid.server.security.Access;
import io.druid.server.security.Action;
import io.druid.server.security.AuthConfig;
import io.druid.server.security.AuthorizationInfo;
import io.druid.server.security.Resource;
import io.druid.server.security.ResourceType;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
/**
* Class that helps a Druid server (broker, historical, etc) manage the lifecycle of a query that it is handling. It
* ensures that a query goes through the following stages, in the proper order:
*
* <ol>
* <li>Initialization ({@link #initialize(Query)})</li>
* <li>Authorization ({@link #authorize(AuthorizationInfo)}</li>
* <li>Execution ({@link #execute()}</li>
* <li>Logging ({@link #emitLogsAndMetrics(Throwable, String, long)}</li>
* </ol>
*
* This object is not thread-safe.
*/
public class QueryLifecycle
{
private static final Logger log = new Logger(QueryLifecycle.class);
private final QueryToolChestWarehouse warehouse;
private final QuerySegmentWalker texasRanger;
private final GenericQueryMetricsFactory queryMetricsFactory;
private final ServiceEmitter emitter;
private final RequestLogger requestLogger;
private final ServerConfig serverConfig;
private final AuthConfig authConfig;
private final long startMs;
private final long startNs;
private State state = State.NEW;
private QueryToolChest toolChest;
private QueryPlus queryPlus;
public QueryLifecycle(
final QueryToolChestWarehouse warehouse,
final QuerySegmentWalker texasRanger,
final GenericQueryMetricsFactory queryMetricsFactory,
final ServiceEmitter emitter,
final RequestLogger requestLogger,
final ServerConfig serverConfig,
final AuthConfig authConfig,
final long startMs,
final long startNs
)
{
this.warehouse = warehouse;
this.texasRanger = texasRanger;
this.queryMetricsFactory = queryMetricsFactory;
this.emitter = emitter;
this.requestLogger = requestLogger;
this.serverConfig = serverConfig;
this.authConfig = authConfig;
this.startMs = startMs;
this.startNs = startNs;
}
/**
* For callers where simplicity is desired over flexibility. This method does it all in one call. If the request
* is unauthorized, an IllegalStateException will be thrown. Logs and metrics are emitted when the Sequence is
* either fully iterated or throws an exception.
*
* @param query the query
* @param authorizationInfo authorization info from the request; or null if none is present. This must be non-null
* if security is enabled, or the request will be considered unauthorized.
* @param remoteAddress remote address, for logging; or null if unknown
*
* @return results
*/
@SuppressWarnings("unchecked")
public <T> Sequence<T> runSimple(
final Query<T> query,
@Nullable final AuthorizationInfo authorizationInfo,
@Nullable final String remoteAddress
)
{
initialize(query);
final Sequence<T> results;
try {
final Access access = authorize(authorizationInfo);
if (!access.isAllowed()) {
throw new ISE("Unauthorized");
}
final QueryLifecycle.QueryResponse queryResponse = execute();
results = queryResponse.getResults();
}
catch (Throwable e) {
emitLogsAndMetrics(e, remoteAddress, -1);
throw e;
}
return Sequences.wrap(
results,
new SequenceWrapper()
{
@Override
public void after(final boolean isDone, final Throwable thrown) throws Exception
{
emitLogsAndMetrics(thrown, remoteAddress, -1);
}
}
);
}
/**
* Initializes this object to execute a specific query. Does not actually execute the query.
*
* @param baseQuery the query
*/
@SuppressWarnings("unchecked")
public void initialize(final Query baseQuery)
{
transition(State.NEW, State.INITIALIZED);
String queryId = baseQuery.getId();
if (queryId == null) {
queryId = UUID.randomUUID().toString();
}
this.queryPlus = QueryPlus.wrap(
(Query) DirectDruidClient.withDefaultTimeoutAndMaxScatterGatherBytes(
baseQuery.withId(queryId),
serverConfig
)
);
this.toolChest = warehouse.getToolChest(baseQuery);
}
/**
* Authorize the query. Will return an Access object denoting whether the query is authorized or not.
*
* @param authorizationInfo authorization info from the request; or null if none is present. This must be non-null
* if security is enabled, or the request will be considered unauthorized.
*
* @return authorization result
*
* @throws IllegalStateException if security is enabled and authorizationInfo is null
*/
public Access authorize(@Nullable final AuthorizationInfo authorizationInfo)
{
transition(State.INITIALIZED, State.AUTHORIZING);
if (authConfig.isEnabled()) {
// This is an experimental feature, see - https://github.com/druid-io/druid/pull/2424
if (authorizationInfo != null) {
for (String dataSource : queryPlus.getQuery().getDataSource().getNames()) {
Access authResult = authorizationInfo.isAuthorized(
new Resource(dataSource, ResourceType.DATASOURCE),
Action.READ
);
if (!authResult.isAllowed()) {
// Not authorized; go straight to Jail, do not pass Go.
transition(State.AUTHORIZING, State.DONE);
return authResult;
}
}
transition(State.AUTHORIZING, State.AUTHORIZED);
return new Access(true);
} else {
throw new ISE("WTF?! Security is enabled but no authorization info found in the request");
}
} else {
transition(State.AUTHORIZING, State.AUTHORIZED);
return new Access(true);
}
}
/**
* Execute the query. Can only be called if the query has been authorized. Note that query logs and metrics will
* not be emitted automatically when the Sequence is fully iterated. It is the caller's responsibility to call
* {@link #emitLogsAndMetrics(Throwable, String, long)} to emit logs and metrics.
*
* @return result sequence and response context
*/
public QueryResponse execute()
{
transition(State.AUTHORIZED, State.EXECUTING);
final Map<String, Object> responseContext = DirectDruidClient.makeResponseContextForQuery(
queryPlus.getQuery(),
System.currentTimeMillis()
);
final Sequence res = queryPlus.run(texasRanger, responseContext);
return new QueryResponse(res == null ? Sequences.empty() : res, responseContext);
}
/**
* Emit logs and metrics for this query.
*
* @param e exception that occurred while processing this query
* @param remoteAddress remote address, for logging; or null if unknown
* @param bytesWritten number of bytes written; will become a query/bytes metric if >= 0
*/
@SuppressWarnings("unchecked")
public void emitLogsAndMetrics(
@Nullable final Throwable e,
@Nullable final String remoteAddress,
final long bytesWritten
)
{
if (queryPlus == null) {
// Never initialized, don't log or emit anything.
return;
}
if (state == State.DONE) {
log.warn("Tried to emit logs and metrics twice for query[%s]!", queryPlus.getQuery().getId());
}
state = State.DONE;
final Query query = queryPlus != null ? queryPlus.getQuery() : null;
final boolean success = e == null;
try {
final long queryTimeNs = System.nanoTime() - startNs;
QueryMetrics queryMetrics = DruidMetrics.makeRequestMetrics(
queryMetricsFactory,
toolChest,
queryPlus.getQuery(),
Strings.nullToEmpty(remoteAddress)
);
queryMetrics.success(success);
queryMetrics.reportQueryTime(queryTimeNs);
if (bytesWritten >= 0) {
queryMetrics.reportQueryBytes(bytesWritten);
}
queryMetrics.emit(emitter);
final Map<String, Object> statsMap = new LinkedHashMap<>();
statsMap.put("query/time", TimeUnit.NANOSECONDS.toMillis(queryTimeNs));
statsMap.put("query/bytes", bytesWritten);
statsMap.put("success", success);
if (e != null) {
statsMap.put("exception", e.toString());
if (e instanceof QueryInterruptedException) {
// Mimic behavior from QueryResource, where this code was originally taken from.
log.warn(e, "Exception while processing queryId [%s]", queryPlus.getQuery().getId());
statsMap.put("interrupted", true);
statsMap.put("reason", e.toString());
}
}
requestLogger.log(
new RequestLogLine(
new DateTime(startMs),
Strings.nullToEmpty(remoteAddress),
queryPlus.getQuery(),
new QueryStats(statsMap)
)
);
}
catch (Exception ex) {
log.error(ex, "Unable to log query [%s]!", query);
}
}
public Query getQuery()
{
return queryPlus.getQuery();
}
private void transition(final State from, final State to)
{
if (state != from) {
throw new ISE("Cannot transition from[%s] to[%s].", from, to);
}
state = to;
}
enum State
{
NEW,
INITIALIZED,
AUTHORIZING,
AUTHORIZED,
EXECUTING,
DONE
}
public static class QueryResponse
{
private final Sequence results;
private final Map<String, Object> responseContext;
private QueryResponse(final Sequence results, final Map<String, Object> responseContext)
{
this.results = results;
this.responseContext = responseContext;
}
public Sequence getResults()
{
return results;
}
public Map<String, Object> getResponseContext()
{
return responseContext;
}
}
}

View File

@ -0,0 +1,77 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.server;
import com.google.inject.Inject;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.guice.LazySingleton;
import io.druid.query.GenericQueryMetricsFactory;
import io.druid.query.QuerySegmentWalker;
import io.druid.query.QueryToolChestWarehouse;
import io.druid.server.initialization.ServerConfig;
import io.druid.server.log.RequestLogger;
import io.druid.server.security.AuthConfig;
@LazySingleton
public class QueryLifecycleFactory
{
private final QueryToolChestWarehouse warehouse;
private final QuerySegmentWalker texasRanger;
private final GenericQueryMetricsFactory queryMetricsFactory;
private final ServiceEmitter emitter;
private final RequestLogger requestLogger;
private final ServerConfig serverConfig;
private final AuthConfig authConfig;
@Inject
public QueryLifecycleFactory(
final QueryToolChestWarehouse warehouse,
final QuerySegmentWalker texasRanger,
final GenericQueryMetricsFactory queryMetricsFactory,
final ServiceEmitter emitter,
final RequestLogger requestLogger,
final ServerConfig serverConfig,
final AuthConfig authConfig
)
{
this.warehouse = warehouse;
this.texasRanger = texasRanger;
this.queryMetricsFactory = queryMetricsFactory;
this.emitter = emitter;
this.requestLogger = requestLogger;
this.serverConfig = serverConfig;
this.authConfig = authConfig;
}
public QueryLifecycle factorize()
{
return new QueryLifecycle(
warehouse,
texasRanger,
queryMetricsFactory,
emitter,
requestLogger,
serverConfig,
authConfig,
System.currentTimeMillis(),
System.nanoTime()
);
}
}

View File

@ -30,29 +30,17 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.io.CountingOutputStream;
import com.google.inject.Inject;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.client.DirectDruidClient;
import io.druid.guice.LazySingleton;
import io.druid.guice.annotations.Json;
import io.druid.guice.annotations.Smile;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
import io.druid.java.util.common.guava.Yielder;
import io.druid.java.util.common.guava.Yielders;
import io.druid.query.DruidMetrics;
import io.druid.query.GenericQueryMetricsFactory;
import io.druid.query.Query;
import io.druid.query.QueryContexts;
import io.druid.query.QueryInterruptedException;
import io.druid.query.QueryMetrics;
import io.druid.query.QueryPlus;
import io.druid.query.QuerySegmentWalker;
import io.druid.query.QueryToolChest;
import io.druid.query.QueryToolChestWarehouse;
import io.druid.server.initialization.ServerConfig;
import io.druid.server.log.RequestLogger;
import io.druid.server.metrics.QueryCountStatsProvider;
import io.druid.server.security.Access;
import io.druid.server.security.Action;
@ -80,8 +68,6 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/**
@ -99,48 +85,33 @@ public class QueryResource implements QueryCountStatsProvider
public static final String HEADER_IF_NONE_MATCH = "If-None-Match";
public static final String HEADER_ETAG = "ETag";
protected final QueryToolChestWarehouse warehouse;
protected final ServerConfig config;
protected final QueryLifecycleFactory queryLifecycleFactory;
protected final ObjectMapper jsonMapper;
protected final ObjectMapper smileMapper;
protected final ObjectMapper serializeDateTimeAsLongJsonMapper;
protected final ObjectMapper serializeDateTimeAsLongSmileMapper;
protected final QuerySegmentWalker texasRanger;
protected final ServiceEmitter emitter;
protected final RequestLogger requestLogger;
protected final QueryManager queryManager;
protected final AuthConfig authConfig;
private final GenericQueryMetricsFactory queryMetricsFactory;
private final AtomicLong successfulQueryCount = new AtomicLong();
private final AtomicLong failedQueryCount = new AtomicLong();
private final AtomicLong interruptedQueryCount = new AtomicLong();
@Inject
public QueryResource(
QueryToolChestWarehouse warehouse,
ServerConfig config,
QueryLifecycleFactory queryLifecycleFactory,
@Json ObjectMapper jsonMapper,
@Smile ObjectMapper smileMapper,
QuerySegmentWalker texasRanger,
ServiceEmitter emitter,
RequestLogger requestLogger,
QueryManager queryManager,
AuthConfig authConfig,
GenericQueryMetricsFactory queryMetricsFactory
AuthConfig authConfig
)
{
this.warehouse = warehouse;
this.config = config;
this.queryLifecycleFactory = queryLifecycleFactory;
this.jsonMapper = jsonMapper;
this.smileMapper = smileMapper;
this.serializeDateTimeAsLongJsonMapper = serializeDataTimeAsLong(jsonMapper);
this.serializeDateTimeAsLongSmileMapper = serializeDataTimeAsLong(smileMapper);
this.texasRanger = texasRanger;
this.emitter = emitter;
this.requestLogger = requestLogger;
this.queryManager = queryManager;
this.authConfig = authConfig;
this.queryMetricsFactory = queryMetricsFactory;
}
@DELETE
@ -181,35 +152,21 @@ public class QueryResource implements QueryCountStatsProvider
@Produces({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE})
@Consumes({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE, APPLICATION_SMILE})
public Response doPost(
InputStream in,
@QueryParam("pretty") String pretty,
final InputStream in,
@QueryParam("pretty") final String pretty,
@Context final HttpServletRequest req // used to get request content-type, remote address and AuthorizationInfo
) throws IOException
{
final long startNs = System.nanoTime();
final QueryLifecycle queryLifecycle = queryLifecycleFactory.factorize();
Query<?> query = null;
QueryToolChest toolChest = null;
String queryId = null;
final ResponseContext context = createContext(req.getContentType(), pretty != null);
final String currThreadName = Thread.currentThread().getName();
try {
query = context.getObjectMapper().readValue(in, Query.class);
queryId = query.getId();
if (queryId == null) {
queryId = UUID.randomUUID().toString();
query = query.withId(queryId);
}
query = DirectDruidClient.withDefaultTimeoutAndMaxScatterGatherBytes(query, config);
final Map<String, Object> responseContext = DirectDruidClient.makeResponseContextForQuery(
query,
System.currentTimeMillis()
);
toolChest = warehouse.getToolChest(query);
queryLifecycle.initialize(readQuery(req, in, context));
query = queryLifecycle.getQuery();
final String queryId = query.getId();
Thread.currentThread()
.setName(StringUtils.format("%s[%s_%s_%s]", currThreadName, query.getType(), query.getDataSource().getNames(), queryId));
@ -217,49 +174,23 @@ public class QueryResource implements QueryCountStatsProvider
log.debug("Got query [%s]", query);
}
if (authConfig.isEnabled()) {
// This is an experimental feature, see - https://github.com/druid-io/druid/pull/2424
AuthorizationInfo authorizationInfo = (AuthorizationInfo) req.getAttribute(AuthConfig.DRUID_AUTH_TOKEN);
if (authorizationInfo != null) {
for (String dataSource : query.getDataSource().getNames()) {
Access authResult = authorizationInfo.isAuthorized(
new Resource(dataSource, ResourceType.DATASOURCE),
Action.READ
);
if (!authResult.isAllowed()) {
return Response.status(Response.Status.FORBIDDEN).header("Access-Check-Result", authResult).build();
}
}
} else {
throw new ISE("WTF?! Security is enabled but no authorization info found in the request");
}
final Access authResult = queryLifecycle.authorize((AuthorizationInfo) req.getAttribute(AuthConfig.DRUID_AUTH_TOKEN));
if (!authResult.isAllowed()) {
return Response.status(Response.Status.FORBIDDEN).header("Access-Check-Result", authResult).build();
}
String prevEtag = req.getHeader(HEADER_IF_NONE_MATCH);
if (prevEtag != null) {
query = query.withOverriddenContext(
ImmutableMap.of (HEADER_IF_NONE_MATCH, prevEtag)
);
}
final Sequence res = QueryPlus.wrap(query).run(texasRanger, responseContext);
final QueryLifecycle.QueryResponse queryResponse = queryLifecycle.execute();
final Sequence<?> results = queryResponse.getResults();
final Map<String, Object> responseContext = queryResponse.getResponseContext();
final String prevEtag = getPreviousEtag(req);
if (prevEtag != null && prevEtag.equals(responseContext.get(HEADER_ETAG))) {
return Response.notModified().build();
}
final Sequence results;
if (res == null) {
results = Sequences.empty();
} else {
results = res;
}
final Yielder yielder = Yielders.each(results);
final Yielder<?> yielder = Yielders.each(results);
try {
final Query theQuery = query;
final QueryToolChest theToolChest = toolChest;
boolean shouldFinalize = QueryContexts.isFinalize(query, true);
boolean serializeDateTimeAsLong =
QueryContexts.isSerializeDateTimeAsLong(query, false)
@ -272,8 +203,7 @@ public class QueryResource implements QueryCountStatsProvider
@Override
public void write(OutputStream outputStream) throws IOException, WebApplicationException
{
boolean success = false;
String exceptionStr = "";
Exception e = null;
CountingOutputStream os = new CountingOutputStream(outputStream);
try {
@ -282,63 +212,21 @@ public class QueryResource implements QueryCountStatsProvider
os.flush(); // Some types of OutputStream suppress flush errors in the .close() method.
os.close();
success = true;
}
catch (Exception ex) {
exceptionStr = ex.toString();
e = ex;
log.error(ex, "Unable to send query response.");
throw Throwables.propagate(ex);
}
finally {
try {
if (success) {
successfulQueryCount.incrementAndGet();
} else {
failedQueryCount.incrementAndGet();
}
Thread.currentThread().setName(currThreadName);
final long queryTimeNs = System.nanoTime() - startNs;
QueryMetrics queryMetrics = DruidMetrics.makeRequestMetrics(
queryMetricsFactory,
theToolChest,
theQuery,
req.getRemoteAddr()
);
queryMetrics.success(success);
queryMetrics.reportQueryTime(queryTimeNs).emit(emitter);
queryLifecycle.emitLogsAndMetrics(e, req.getRemoteAddr(), os.getCount());
DruidMetrics.makeRequestMetrics(
queryMetricsFactory,
theToolChest,
theQuery,
req.getRemoteAddr()
).reportQueryBytes(os.getCount()).emit(emitter);
ImmutableMap.Builder<String, Object> statsMapBuilder = ImmutableMap.builder();
statsMapBuilder.put("query/time", TimeUnit.NANOSECONDS.toMillis(queryTimeNs));
statsMapBuilder.put("query/bytes", os.getCount());
statsMapBuilder.put("success", success);
if (!success) {
statsMapBuilder.put("exception", exceptionStr);
}
requestLogger.log(
new RequestLogLine(
new DateTime(TimeUnit.NANOSECONDS.toMillis(startNs)),
req.getRemoteAddr(),
theQuery,
new QueryStats(
statsMapBuilder.build()
)
)
);
}
catch (Exception ex) {
log.error(ex, "Unable to log query [%s]!", theQuery);
}
finally {
Thread.currentThread().setName(currThreadName);
if (e == null) {
successfulQueryCount.incrementAndGet();
} else {
failedQueryCount.incrementAndGet();
}
}
}
@ -352,8 +240,7 @@ public class QueryResource implements QueryCountStatsProvider
responseContext.remove(HEADER_ETAG);
}
responseContext.remove(DirectDruidClient.QUERY_FAIL_TIME);
responseContext.remove(DirectDruidClient.QUERY_TOTAL_BYTES_GATHERED);
DirectDruidClient.removeMagicResponseContextFields(responseContext);
//Limit the response-context header, see https://github.com/druid-io/druid/issues/2331
//Note that Response.ResponseBuilder.header(String key,Object value).build() calls value.toString()
@ -379,86 +266,17 @@ public class QueryResource implements QueryCountStatsProvider
}
}
catch (QueryInterruptedException e) {
try {
log.warn(e, "Exception while processing queryId [%s]", queryId);
interruptedQueryCount.incrementAndGet();
final long queryTimeNs = System.nanoTime() - startNs;
QueryMetrics queryMetrics = DruidMetrics.makeRequestMetrics(
queryMetricsFactory,
toolChest,
query,
req.getRemoteAddr()
);
queryMetrics.success(false);
queryMetrics.reportQueryTime(queryTimeNs).emit(emitter);
requestLogger.log(
new RequestLogLine(
new DateTime(TimeUnit.NANOSECONDS.toMillis(startNs)),
req.getRemoteAddr(),
query,
new QueryStats(
ImmutableMap.<String, Object>of(
"query/time",
TimeUnit.NANOSECONDS.toMillis(queryTimeNs),
"success",
false,
"interrupted",
true,
"reason",
e.toString()
)
)
)
);
}
catch (Exception e2) {
log.error(e2, "Unable to log query [%s]!", query);
}
interruptedQueryCount.incrementAndGet();
queryLifecycle.emitLogsAndMetrics(e, req.getRemoteAddr(), -1);
return context.gotError(e);
}
catch (Exception e) {
// Input stream has already been consumed by the json object mapper if query == null
final String queryString =
query == null
? "unparsable query"
: query.toString();
log.warn(e, "Exception occurred on request [%s]", queryString);
failedQueryCount.incrementAndGet();
try {
final long queryTimeNs = System.nanoTime() - startNs;
QueryMetrics queryMetrics = DruidMetrics.makeRequestMetrics(
queryMetricsFactory,
toolChest,
query,
req.getRemoteAddr()
);
queryMetrics.success(false);
queryMetrics.reportQueryTime(queryTimeNs).emit(emitter);
requestLogger.log(
new RequestLogLine(
new DateTime(TimeUnit.NANOSECONDS.toMillis(startNs)),
req.getRemoteAddr(),
query,
new QueryStats(ImmutableMap.<String, Object>of(
"query/time",
TimeUnit.NANOSECONDS.toMillis(queryTimeNs),
"success",
false,
"exception",
e.toString()
))
)
);
}
catch (Exception e2) {
log.error(e2, "Unable to log query [%s]!", queryString);
}
queryLifecycle.emitLogsAndMetrics(e, req.getRemoteAddr(), -1);
log.makeAlert(e, "Exception handling request")
.addData("exception", e.toString())
.addData("query", queryString)
.addData("query", query != null ? query.toString() : "unparseable query")
.addData("peer", req.getRemoteAddr())
.emit();
@ -469,6 +287,29 @@ public class QueryResource implements QueryCountStatsProvider
}
}
private static Query<?> readQuery(
final HttpServletRequest req,
final InputStream in,
final ResponseContext context
) throws IOException
{
Query baseQuery = context.getObjectMapper().readValue(in, Query.class);
String prevEtag = getPreviousEtag(req);
if (prevEtag != null) {
baseQuery = baseQuery.withOverriddenContext(
ImmutableMap.of(HEADER_IF_NONE_MATCH, prevEtag)
);
}
return baseQuery;
}
private static String getPreviousEtag(final HttpServletRequest req)
{
return req.getHeader(HEADER_IF_NONE_MATCH);
}
protected ObjectMapper serializeDataTimeAsLong(ObjectMapper mapper)
{
return mapper.copy().registerModule(new SimpleModule().addSerializer(DateTime.class, new DateTimeSerializer()));

View File

@ -0,0 +1,39 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.server.security;
/**
* An AuthorizationInfo that is useful for actions generated internally by the system. It allows everything.
*/
public class SystemAuthorizationInfo implements AuthorizationInfo
{
public static final SystemAuthorizationInfo INSTANCE = new SystemAuthorizationInfo();
private SystemAuthorizationInfo()
{
// Singleton.
}
@Override
public Access isAuthorized(final Resource resource, final Action action)
{
return new Access(true);
}
}

View File

@ -132,16 +132,19 @@ public class QueryResourceTest
EasyMock.expect(testServletRequest.getRemoteAddr()).andReturn("localhost").anyTimes();
queryManager = new QueryManager();
queryResource = new QueryResource(
warehouse,
serverConfig,
new QueryLifecycleFactory(
warehouse,
testSegmentWalker,
new DefaultGenericQueryMetricsFactory(jsonMapper),
new NoopServiceEmitter(),
new NoopRequestLogger(),
serverConfig,
new AuthConfig()
),
jsonMapper,
jsonMapper,
testSegmentWalker,
new NoopServiceEmitter(),
new NoopRequestLogger(),
queryManager,
new AuthConfig(),
new DefaultGenericQueryMetricsFactory(jsonMapper)
new AuthConfig()
);
}
@ -163,6 +166,16 @@ public class QueryResourceTest
@Test
public void testGoodQuery() throws IOException
{
EasyMock.expect(testServletRequest.getAttribute(EasyMock.anyString())).andReturn(
new AuthorizationInfo()
{
@Override
public Access isAuthorized(Resource resource, Action action)
{
return new Access(true);
}
}
).times(1);
EasyMock.replay(testServletRequest);
Response response = queryResource.doPost(
new ByteArrayInputStream(simpleTimeSeriesQuery.getBytes("UTF-8")),
@ -207,16 +220,19 @@ public class QueryResourceTest
EasyMock.replay(testServletRequest);
queryResource = new QueryResource(
warehouse,
serverConfig,
new QueryLifecycleFactory(
warehouse,
testSegmentWalker,
new DefaultGenericQueryMetricsFactory(jsonMapper),
new NoopServiceEmitter(),
new NoopRequestLogger(),
serverConfig,
new AuthConfig(true)
),
jsonMapper,
jsonMapper,
testSegmentWalker,
new NoopServiceEmitter(),
new NoopRequestLogger(),
queryManager,
new AuthConfig(true),
new DefaultGenericQueryMetricsFactory(jsonMapper)
new AuthConfig(true)
);
Response response = queryResource.doPost(
@ -278,16 +294,19 @@ public class QueryResourceTest
EasyMock.replay(testServletRequest);
queryResource = new QueryResource(
warehouse,
serverConfig,
new QueryLifecycleFactory(
warehouse,
testSegmentWalker,
new DefaultGenericQueryMetricsFactory(jsonMapper),
new NoopServiceEmitter(),
new NoopRequestLogger(),
serverConfig,
new AuthConfig(true)
),
jsonMapper,
jsonMapper,
testSegmentWalker,
new NoopServiceEmitter(),
new NoopRequestLogger(),
queryManager,
new AuthConfig(true),
new DefaultGenericQueryMetricsFactory(jsonMapper)
new AuthConfig(true)
);
final String queryString = "{\"queryType\":\"timeBoundary\", \"dataSource\":\"allow\","
@ -375,16 +394,19 @@ public class QueryResourceTest
EasyMock.replay(testServletRequest);
queryResource = new QueryResource(
warehouse,
serverConfig,
new QueryLifecycleFactory(
warehouse,
testSegmentWalker,
new DefaultGenericQueryMetricsFactory(jsonMapper),
new NoopServiceEmitter(),
new NoopRequestLogger(),
serverConfig,
new AuthConfig(true)
),
jsonMapper,
jsonMapper,
testSegmentWalker,
new NoopServiceEmitter(),
new NoopRequestLogger(),
queryManager,
new AuthConfig(true),
new DefaultGenericQueryMetricsFactory(jsonMapper)
new AuthConfig(true)
);
final String queryString = "{\"queryType\":\"timeBoundary\", \"dataSource\":\"allow\","

View File

@ -21,8 +21,7 @@ package io.druid.sql.calcite.planner;
import com.google.inject.Inject;
import io.druid.math.expr.ExprMacroTable;
import io.druid.query.QuerySegmentWalker;
import io.druid.server.initialization.ServerConfig;
import io.druid.server.QueryLifecycleFactory;
import io.druid.sql.calcite.rel.QueryMaker;
import io.druid.sql.calcite.schema.DruidSchema;
import org.apache.calcite.avatica.util.Casing;
@ -50,35 +49,32 @@ public class PlannerFactory
.build();
private final DruidSchema druidSchema;
private final QuerySegmentWalker walker;
private final QueryLifecycleFactory queryLifecycleFactory;
private final DruidOperatorTable operatorTable;
private final ExprMacroTable macroTable;
private final PlannerConfig plannerConfig;
private final ServerConfig serverConfig;
@Inject
public PlannerFactory(
final DruidSchema druidSchema,
final QuerySegmentWalker walker,
final QueryLifecycleFactory queryLifecycleFactory,
final DruidOperatorTable operatorTable,
final ExprMacroTable macroTable,
final PlannerConfig plannerConfig,
final ServerConfig serverConfig
final PlannerConfig plannerConfig
)
{
this.druidSchema = druidSchema;
this.walker = walker;
this.queryLifecycleFactory = queryLifecycleFactory;
this.operatorTable = operatorTable;
this.macroTable = macroTable;
this.plannerConfig = plannerConfig;
this.serverConfig = serverConfig;
}
public DruidPlanner createPlanner(final Map<String, Object> queryContext)
{
final SchemaPlus rootSchema = Calcites.createRootSchema(druidSchema);
final PlannerContext plannerContext = PlannerContext.create(operatorTable, macroTable, plannerConfig, queryContext);
final QueryMaker queryMaker = new QueryMaker(walker, plannerContext, serverConfig);
final QueryMaker queryMaker = new QueryMaker(queryLifecycleFactory, plannerContext);
final FrameworkConfig frameworkConfig = Frameworks
.newConfigBuilder()
.parserConfig(PARSER_CONFIG)

View File

@ -98,7 +98,6 @@ public class DruidNestedGroupBy extends DruidRel<DruidNestedGroupBy>
if (queryDataSource != null) {
return getQueryMaker().runQuery(
queryDataSource,
sourceRel.getOutputRowSignature(),
queryBuilder
);
} else {

View File

@ -164,7 +164,7 @@ public class DruidQueryRel extends DruidRel<DruidQueryRel>
@Override
public Sequence<Object[]> runQuery()
{
return getQueryMaker().runQuery(druidTable.getDataSource(), druidTable.getRowSignature(), queryBuilder);
return getQueryMaker().runQuery(druidTable.getDataSource(), queryBuilder);
}
@Override

View File

@ -25,7 +25,6 @@ import com.google.common.base.Strings;
import com.google.common.collect.Iterables;
import com.google.common.primitives.Doubles;
import com.google.common.primitives.Ints;
import io.druid.client.DirectDruidClient;
import io.druid.common.guava.GuavaUtils;
import io.druid.data.input.Row;
import io.druid.java.util.common.ISE;
@ -33,9 +32,8 @@ import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
import io.druid.math.expr.Evals;
import io.druid.query.DataSource;
import io.druid.query.Query;
import io.druid.query.QueryDataSource;
import io.druid.query.QueryPlus;
import io.druid.query.QuerySegmentWalker;
import io.druid.query.Result;
import io.druid.query.groupby.GroupByQuery;
import io.druid.query.select.EventHolder;
@ -48,10 +46,9 @@ import io.druid.query.topn.DimensionAndMetricValueExtractor;
import io.druid.query.topn.TopNQuery;
import io.druid.query.topn.TopNResultValue;
import io.druid.segment.column.Column;
import io.druid.server.initialization.ServerConfig;
import io.druid.server.QueryLifecycleFactory;
import io.druid.sql.calcite.planner.Calcites;
import io.druid.sql.calcite.planner.PlannerContext;
import io.druid.sql.calcite.table.RowSignature;
import org.apache.calcite.avatica.ColumnMetaData;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.runtime.Hook;
@ -69,19 +66,16 @@ import java.util.concurrent.atomic.AtomicReference;
public class QueryMaker
{
private final QuerySegmentWalker walker;
private final QueryLifecycleFactory queryLifecycleFactory;
private final PlannerContext plannerContext;
private final ServerConfig serverConfig;
public QueryMaker(
final QuerySegmentWalker walker,
final PlannerContext plannerContext,
final ServerConfig serverConfig
final QueryLifecycleFactory queryLifecycleFactory,
final PlannerContext plannerContext
)
{
this.walker = walker;
this.queryLifecycleFactory = queryLifecycleFactory;
this.plannerContext = plannerContext;
this.serverConfig = serverConfig;
}
public PlannerContext getPlannerContext()
@ -91,7 +85,6 @@ public class QueryMaker
public Sequence<Object[]> runQuery(
final DataSource dataSource,
final RowSignature sourceRowSignature,
final DruidQueryBuilder queryBuilder
)
{
@ -161,31 +154,20 @@ public class QueryMaker
@Override
public Sequence<Object[]> next()
{
final SelectQuery queryWithPagination = DirectDruidClient.withDefaultTimeoutAndMaxScatterGatherBytes(
baseQuery.withPagingSpec(
new PagingSpec(
pagingIdentifiers.get(),
plannerContext.getPlannerConfig().getSelectThreshold(),
true
)
),
serverConfig
final SelectQuery queryWithPagination = baseQuery.withPagingSpec(
new PagingSpec(
pagingIdentifiers.get(),
plannerContext.getPlannerConfig().getSelectThreshold(),
true
)
);
Hook.QUERY_PLAN.run(queryWithPagination);
morePages.set(false);
final AtomicBoolean gotResult = new AtomicBoolean();
return Sequences.concat(
Sequences.map(
QueryPlus.wrap(queryWithPagination)
.run(walker,
DirectDruidClient.makeResponseContextForQuery(
queryWithPagination,
plannerContext.getQueryStartTimeMillis()
)
),
runQuery(queryWithPagination),
new Function<Result<SelectResultValue>, Sequence<Object[]>>()
{
@Override
@ -244,30 +226,29 @@ public class QueryMaker
return Sequences.concat(sequenceOfSequences);
}
@SuppressWarnings("unchecked")
private <T> Sequence<T> runQuery(final Query<T> query)
{
Hook.QUERY_PLAN.run(query);
// Authorization really should be applied in planning. At this point the query has already begun to execute.
// So, use "null" authorizationInfo to force the query to fail if security is enabled.
return queryLifecycleFactory.factorize().runSimple(query, null, null);
}
private Sequence<Object[]> executeTimeseries(
final DruidQueryBuilder queryBuilder,
final TimeseriesQuery baseQuery
final TimeseriesQuery query
)
{
final TimeseriesQuery query = DirectDruidClient.withDefaultTimeoutAndMaxScatterGatherBytes(
baseQuery,
serverConfig
);
final List<RelDataTypeField> fieldList = queryBuilder.getRowType().getFieldList();
final String timeOutputName = queryBuilder.getGrouping().getDimensions().isEmpty()
? null
: Iterables.getOnlyElement(queryBuilder.getGrouping().getDimensions())
.getOutputName();
Hook.QUERY_PLAN.run(query);
return Sequences.map(
QueryPlus.wrap(query)
.run(
walker,
DirectDruidClient.makeResponseContextForQuery(query, plannerContext.getQueryStartTimeMillis())
),
runQuery(query),
new Function<Result<TimeseriesResultValue>, Object[]>()
{
@Override
@ -293,25 +274,14 @@ public class QueryMaker
private Sequence<Object[]> executeTopN(
final DruidQueryBuilder queryBuilder,
final TopNQuery baseQuery
final TopNQuery query
)
{
final TopNQuery query = DirectDruidClient.withDefaultTimeoutAndMaxScatterGatherBytes(
baseQuery,
serverConfig
);
final List<RelDataTypeField> fieldList = queryBuilder.getRowType().getFieldList();
Hook.QUERY_PLAN.run(query);
return Sequences.concat(
Sequences.map(
QueryPlus.wrap(query)
.run(
walker,
DirectDruidClient.makeResponseContextForQuery(query, plannerContext.getQueryStartTimeMillis())
),
runQuery(query),
new Function<Result<TopNResultValue>, Sequence<Object[]>>()
{
@Override
@ -339,23 +309,13 @@ public class QueryMaker
private Sequence<Object[]> executeGroupBy(
final DruidQueryBuilder queryBuilder,
final GroupByQuery baseQuery
final GroupByQuery query
)
{
final GroupByQuery query = DirectDruidClient.withDefaultTimeoutAndMaxScatterGatherBytes(
baseQuery,
serverConfig
);
final List<RelDataTypeField> fieldList = queryBuilder.getRowType().getFieldList();
Hook.QUERY_PLAN.run(query);
return Sequences.map(
QueryPlus.wrap(query)
.run(
walker,
DirectDruidClient.makeResponseContextForQuery(query, plannerContext.getQueryStartTimeMillis())
),
runQuery(query),
new Function<Row, Object[]>()
{
@Override

View File

@ -30,7 +30,6 @@ import com.google.common.collect.Sets;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import com.metamx.emitter.EmittingLogger;
import io.druid.client.DirectDruidClient;
import io.druid.client.ServerView;
import io.druid.client.TimelineServerView;
import io.druid.guice.ManageLifecycle;
@ -41,8 +40,6 @@ import io.druid.java.util.common.guava.Yielder;
import io.druid.java.util.common.guava.Yielders;
import io.druid.java.util.common.lifecycle.LifecycleStart;
import io.druid.java.util.common.lifecycle.LifecycleStop;
import io.druid.query.QueryPlus;
import io.druid.query.QuerySegmentWalker;
import io.druid.query.TableDataSource;
import io.druid.query.metadata.metadata.AllColumnIncluderator;
import io.druid.query.metadata.metadata.ColumnAnalysis;
@ -50,8 +47,9 @@ import io.druid.query.metadata.metadata.SegmentAnalysis;
import io.druid.query.metadata.metadata.SegmentMetadataQuery;
import io.druid.query.spec.MultipleSpecificSegmentSpec;
import io.druid.segment.column.ValueType;
import io.druid.server.QueryLifecycleFactory;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.server.initialization.ServerConfig;
import io.druid.server.security.SystemAuthorizationInfo;
import io.druid.sql.calcite.planner.PlannerConfig;
import io.druid.sql.calcite.table.DruidTable;
import io.druid.sql.calcite.table.RowSignature;
@ -91,13 +89,12 @@ public class DruidSchema extends AbstractSchema
private static final EmittingLogger log = new EmittingLogger(DruidSchema.class);
private static final int MAX_SEGMENTS_PER_QUERY = 15000;
private final QuerySegmentWalker walker;
private final QueryLifecycleFactory queryLifecycleFactory;
private final TimelineServerView serverView;
private final PlannerConfig config;
private final ViewManager viewManager;
private final ExecutorService cacheExec;
private final ConcurrentMap<String, DruidTable> tables;
private final ServerConfig serverConfig;
// For awaitInitialization.
private final CountDownLatch initializationLatch = new CountDownLatch(1);
@ -124,20 +121,18 @@ public class DruidSchema extends AbstractSchema
@Inject
public DruidSchema(
final QuerySegmentWalker walker,
final QueryLifecycleFactory queryLifecycleFactory,
final TimelineServerView serverView,
final PlannerConfig config,
final ViewManager viewManager,
final ServerConfig serverConfig
final ViewManager viewManager
)
{
this.walker = Preconditions.checkNotNull(walker, "walker");
this.queryLifecycleFactory = Preconditions.checkNotNull(queryLifecycleFactory, "queryLifecycleFactory");
this.serverView = Preconditions.checkNotNull(serverView, "serverView");
this.config = Preconditions.checkNotNull(config, "config");
this.viewManager = Preconditions.checkNotNull(viewManager, "viewManager");
this.cacheExec = ScheduledExecutors.fixed(1, "DruidSchema-Cache-%d");
this.tables = Maps.newConcurrentMap();
this.serverConfig = serverConfig;
}
@LifecycleStart
@ -404,8 +399,7 @@ public class DruidSchema extends AbstractSchema
final Set<DataSegment> retVal = new HashSet<>();
final Sequence<SegmentAnalysis> sequence = runSegmentMetadataQuery(
walker,
serverConfig,
queryLifecycleFactory,
Iterables.limit(segments, MAX_SEGMENTS_PER_QUERY)
);
@ -475,8 +469,7 @@ public class DruidSchema extends AbstractSchema
}
private static Sequence<SegmentAnalysis> runSegmentMetadataQuery(
final QuerySegmentWalker walker,
final ServerConfig serverConfig,
final QueryLifecycleFactory queryLifecycleFactory,
final Iterable<DataSegment> segments
)
{
@ -491,28 +484,19 @@ public class DruidSchema extends AbstractSchema
.map(DataSegment::toDescriptor).collect(Collectors.toList())
);
final SegmentMetadataQuery segmentMetadataQuery = DirectDruidClient.withDefaultTimeoutAndMaxScatterGatherBytes(
new SegmentMetadataQuery(
new TableDataSource(dataSource),
querySegmentSpec,
new AllColumnIncluderator(),
false,
ImmutableMap.of(),
EnumSet.noneOf(SegmentMetadataQuery.AnalysisType.class),
false,
false
),
serverConfig
final SegmentMetadataQuery segmentMetadataQuery = new SegmentMetadataQuery(
new TableDataSource(dataSource),
querySegmentSpec,
new AllColumnIncluderator(),
false,
ImmutableMap.of(),
EnumSet.noneOf(SegmentMetadataQuery.AnalysisType.class),
false,
false
);
return QueryPlus.wrap(segmentMetadataQuery)
.run(
walker,
DirectDruidClient.makeResponseContextForQuery(
segmentMetadataQuery,
System.currentTimeMillis()
)
);
// Use SystemAuthorizationInfo since this is a query generated by Druid itself.
return queryLifecycleFactory.factorize().runSimple(segmentMetadataQuery, SystemAuthorizationInfo.INSTANCE, null);
}
private static RowSignature analysisToRowSignature(final SegmentAnalysis analysis)

View File

@ -120,7 +120,13 @@ public class DruidAvaticaHandlerTest
final DruidOperatorTable operatorTable = CalciteTests.createOperatorTable();
final ExprMacroTable macroTable = CalciteTests.createExprMacroTable();
druidMeta = new DruidMeta(
new PlannerFactory(druidSchema, walker, operatorTable, macroTable, plannerConfig, new ServerConfig()),
new PlannerFactory(
druidSchema,
CalciteTests.createMockQueryLifecycleFactory(walker),
operatorTable,
macroTable,
plannerConfig
),
AVATICA_CONFIG
);
final DruidAvaticaHandler handler = new DruidAvaticaHandler(
@ -545,7 +551,13 @@ public class DruidAvaticaHandlerTest
final List<Meta.Frame> frames = new ArrayList<>();
DruidMeta smallFrameDruidMeta = new DruidMeta(
new PlannerFactory(druidSchema, walker, operatorTable, macroTable, plannerConfig, new ServerConfig()),
new PlannerFactory(
druidSchema,
CalciteTests.createMockQueryLifecycleFactory(walker),
operatorTable,
macroTable,
plannerConfig
),
smallFrameConfig
)
{

View File

@ -22,7 +22,6 @@ package io.druid.sql.avatica;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
import io.druid.math.expr.ExprMacroTable;
import io.druid.server.initialization.ServerConfig;
import io.druid.sql.calcite.planner.Calcites;
import io.druid.sql.calcite.planner.DruidOperatorTable;
import io.druid.sql.calcite.planner.PlannerConfig;
@ -68,11 +67,10 @@ public class DruidStatementTest
final ExprMacroTable macroTable = CalciteTests.createExprMacroTable();
plannerFactory = new PlannerFactory(
druidSchema,
walker,
CalciteTests.createMockQueryLifecycleFactory(walker),
operatorTable,
macroTable,
plannerConfig,
new ServerConfig()
plannerConfig
);
}

View File

@ -81,7 +81,6 @@ import io.druid.query.topn.TopNQueryBuilder;
import io.druid.segment.column.Column;
import io.druid.segment.column.ValueType;
import io.druid.segment.virtual.ExpressionVirtualColumn;
import io.druid.server.initialization.ServerConfig;
import io.druid.sql.calcite.filtration.Filtration;
import io.druid.sql.calcite.planner.Calcites;
import io.druid.sql.calcite.planner.DruidOperatorTable;
@ -5354,11 +5353,10 @@ public class CalciteQueryTest
final ExprMacroTable macroTable = CalciteTests.createExprMacroTable();
final PlannerFactory plannerFactory = new PlannerFactory(
druidSchema,
walker,
CalciteTests.createMockQueryLifecycleFactory(walker),
operatorTable,
macroTable,
plannerConfig,
new ServerConfig()
plannerConfig
);
viewManager.createView(

View File

@ -29,7 +29,6 @@ import io.druid.java.util.common.Pair;
import io.druid.math.expr.ExprMacroTable;
import io.druid.query.QueryInterruptedException;
import io.druid.query.ResourceLimitExceededException;
import io.druid.server.initialization.ServerConfig;
import io.druid.sql.calcite.planner.Calcites;
import io.druid.sql.calcite.planner.DruidOperatorTable;
import io.druid.sql.calcite.planner.PlannerConfig;
@ -80,7 +79,13 @@ public class SqlResourceTest
final ExprMacroTable macroTable = CalciteTests.createExprMacroTable();
resource = new SqlResource(
JSON_MAPPER,
new PlannerFactory(druidSchema, walker, operatorTable, macroTable, plannerConfig, new ServerConfig())
new PlannerFactory(
druidSchema,
CalciteTests.createMockQueryLifecycleFactory(walker),
operatorTable,
macroTable,
plannerConfig
)
);
}

View File

@ -31,7 +31,6 @@ import io.druid.segment.IndexBuilder;
import io.druid.segment.QueryableIndex;
import io.druid.segment.TestHelper;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.server.initialization.ServerConfig;
import io.druid.sql.calcite.planner.Calcites;
import io.druid.sql.calcite.planner.PlannerConfig;
import io.druid.sql.calcite.table.DruidTable;
@ -141,11 +140,10 @@ public class DruidSchemaTest
);
schema = new DruidSchema(
walker,
CalciteTests.createMockQueryLifecycleFactory(walker),
new TestServerInventoryView(walker.getSegments()),
PLANNER_CONFIG_DEFAULT,
new NoopViewManager(),
new ServerConfig()
new NoopViewManager()
);
schema.start();

View File

@ -30,6 +30,8 @@ import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Module;
import com.metamx.emitter.core.NoopEmitter;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.collections.StupidPool;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.DimensionsSpec;
@ -40,12 +42,16 @@ import io.druid.data.input.impl.TimestampSpec;
import io.druid.guice.ExpressionModule;
import io.druid.guice.annotations.Json;
import io.druid.math.expr.ExprMacroTable;
import io.druid.query.DefaultGenericQueryMetricsFactory;
import io.druid.query.DefaultQueryRunnerFactoryConglomerate;
import io.druid.query.DruidProcessingConfig;
import io.druid.query.Query;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.QuerySegmentWalker;
import io.druid.query.QueryToolChest;
import io.druid.query.QueryToolChestWarehouse;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
@ -77,7 +83,10 @@ import io.druid.segment.IndexBuilder;
import io.druid.segment.QueryableIndex;
import io.druid.segment.TestHelper;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.server.QueryLifecycleFactory;
import io.druid.server.initialization.ServerConfig;
import io.druid.server.log.NoopRequestLogger;
import io.druid.server.security.AuthConfig;
import io.druid.sql.calcite.aggregation.SqlAggregator;
import io.druid.sql.calcite.expression.SqlOperatorConversion;
import io.druid.sql.calcite.planner.DruidOperatorTable;
@ -272,6 +281,26 @@ public class CalciteTests
return CONGLOMERATE;
}
public static QueryLifecycleFactory createMockQueryLifecycleFactory(final QuerySegmentWalker walker)
{
return new QueryLifecycleFactory(
new QueryToolChestWarehouse()
{
@Override
public <T, QueryType extends Query<T>> QueryToolChest<T, QueryType> getToolChest(final QueryType query)
{
return CONGLOMERATE.findFactory(query).getToolchest();
}
},
walker,
new DefaultGenericQueryMetricsFactory(INJECTOR.getInstance(Key.get(ObjectMapper.class, Json.class))),
new ServiceEmitter("dummy", "dummy", new NoopEmitter()),
new NoopRequestLogger(),
new ServerConfig(),
new AuthConfig()
);
}
public static SpecificSegmentsQuerySegmentWalker createMockWalker(final File tmpDir)
{
final QueryableIndex index1 = IndexBuilder.create()
@ -353,11 +382,10 @@ public class CalciteTests
)
{
final DruidSchema schema = new DruidSchema(
walker,
CalciteTests.createMockQueryLifecycleFactory(walker),
new TestServerInventoryView(walker.getSegments()),
plannerConfig,
viewManager,
new ServerConfig()
viewManager
);
schema.start();