Expose HTTP Response headers from SqlResource (#13052)

* Expose HTTP Response headers from SqlResource

This change makes the SqlResource expose HTTP response
headers in the same way that the QueryResource exposes them.

Fundamentally, the change is to pipe the QueryResponse
object all the way through to the Resource so that it can
populate response headers.  There is also some code
cleanup around DI, as there was a superfluous FactoryFactory
class muddying things up.
This commit is contained in:
imply-cheddar 2022-09-12 17:40:06 +09:00 committed by GitHub
parent f60ec8e7ca
commit 5ba0075c0c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
47 changed files with 771 additions and 459 deletions

View File

@ -518,7 +518,7 @@ public class SqlBenchmark
final String sql = QUERIES.get(Integer.parseInt(query));
try (final DruidPlanner planner = plannerFactory.createPlannerForTesting(engine, sql, new QueryContext(context))) {
final PlannerResult plannerResult = planner.plan();
final Sequence<Object[]> resultSequence = plannerResult.run();
final Sequence<Object[]> resultSequence = plannerResult.run().getResults();
final Object[] lastRow = resultSequence.accumulate(null, (accumulated, in) -> in);
blackhole.consume(lastRow);
}

View File

@ -354,7 +354,7 @@ public class SqlExpressionBenchmark
final String sql = QUERIES.get(Integer.parseInt(query));
try (final DruidPlanner planner = plannerFactory.createPlannerForTesting(engine, sql, new QueryContext(context))) {
final PlannerResult plannerResult = planner.plan();
final Sequence<Object[]> resultSequence = plannerResult.run();
final Sequence<Object[]> resultSequence = plannerResult.run().getResults();
final Object[] lastRow = resultSequence.accumulate(null, (accumulated, in) -> in);
blackhole.consume(lastRow);
}

View File

@ -320,7 +320,7 @@ public class SqlNestedDataBenchmark
final String sql = QUERIES.get(Integer.parseInt(query));
try (final DruidPlanner planner = plannerFactory.createPlannerForTesting(engine, sql, new QueryContext(context))) {
final PlannerResult plannerResult = planner.plan();
final Sequence<Object[]> resultSequence = plannerResult.run();
final Sequence<Object[]> resultSequence = plannerResult.run().getResults();
final Object[] lastRow = resultSequence.accumulate(null, (accumulated, in) -> in);
blackhole.consume(lastRow);
}

View File

@ -169,7 +169,7 @@ public class SqlVsNativeBenchmark
{
try (final DruidPlanner planner = plannerFactory.createPlannerForTesting(engine, sqlQuery, new QueryContext())) {
final PlannerResult plannerResult = planner.plan();
final Sequence<Object[]> resultSequence = plannerResult.run();
final Sequence<Object[]> resultSequence = plannerResult.run().getResults();
final Object[] lastRow = resultSequence.accumulate(null, (accumulated, in) -> in);
blackhole.consume(lastRow);
}

View File

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.guice.annotations;
import com.google.inject.BindingAnnotation;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* Binding annotation for implements of interfaces that are focused on running native queries. This is generally
* contrasted with the MSQ annotation.
*
* @see Parent
*/
@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@BindingAnnotation
public @interface NativeQuery
{
}

View File

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.guice.annotations;
import com.google.inject.BindingAnnotation;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* Binding annotation for implements of interfaces that are MSQ (MultiStageQuery) focused. This is generally
* contrasted with the NativeQ annotation.
*
* @see Parent
*/
@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@BindingAnnotation
public @interface MSQ
{
}

View File

@ -22,10 +22,16 @@ package org.apache.druid.msq.guice;
import com.fasterxml.jackson.databind.Module;
import com.google.inject.Binder;
import com.google.inject.Inject;
import com.google.inject.Provides;
import org.apache.druid.discovery.NodeRole;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.annotations.LoadScope;
import org.apache.druid.guice.annotations.MSQ;
import org.apache.druid.initialization.DruidModule;
import org.apache.druid.metadata.input.InputSourceModule;
import org.apache.druid.msq.sql.MSQTaskSqlEngine;
import org.apache.druid.sql.SqlStatementFactory;
import org.apache.druid.sql.SqlToolbox;
import org.apache.druid.sql.calcite.external.ExternalOperatorConversion;
import org.apache.druid.sql.guice.SqlBindings;
@ -54,7 +60,21 @@ public class MSQSqlModule implements DruidModule
// We want this module to bring InputSourceModule along for the ride.
binder.install(new InputSourceModule());
binder.bind(MSQTaskSqlEngine.class).in(LazySingleton.class);
// Set up the EXTERN macro.
SqlBindings.addOperatorConversion(binder, ExternalOperatorConversion.class);
}
@Provides
@MSQ
@LazySingleton
public SqlStatementFactory makeMSQSqlStatementFactory(
final MSQTaskSqlEngine engine,
SqlToolbox toolbox
)
{
return new SqlStatementFactory(toolbox.withEngine(engine));
}
}

View File

@ -31,7 +31,6 @@ import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.msq.exec.MSQTasks;
import org.apache.druid.msq.indexing.ColumnMapping;
@ -49,6 +48,7 @@ import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.segment.DimensionHandlerUtils;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.server.QueryResponse;
import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
import org.apache.druid.sql.calcite.parser.DruidSqlReplace;
import org.apache.druid.sql.calcite.planner.PlannerContext;
@ -105,7 +105,7 @@ public class MSQTaskQueryMaker implements QueryMaker
}
@Override
public Sequence<Object[]> runQuery(final DruidQuery druidQuery)
public QueryResponse runQuery(final DruidQuery druidQuery)
{
String taskId = MSQTasks.controllerTaskId(plannerContext.getSqlQueryId());
@ -259,7 +259,7 @@ public class MSQTaskQueryMaker implements QueryMaker
);
FutureUtils.getUnchecked(overlordClient.runTask(taskId, controllerTask), true);
return Sequences.simple(Collections.singletonList(new Object[]{taskId}));
return QueryResponse.withEmptyContext(Sequences.simple(Collections.singletonList(new Object[]{taskId})));
}
private static Map<String, ColumnType> buildAggregationIntermediateTypeMap(final DruidQuery druidQuery)

View File

@ -32,7 +32,6 @@ import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.tools.ValidationException;
import org.apache.calcite.util.Pair;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
@ -55,7 +54,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
@LazySingleton
public class MSQTaskSqlEngine implements SqlEngine
{
public static final Set<String> SYSTEM_CONTEXT_PARAMETERS =

View File

@ -25,6 +25,7 @@ import com.google.common.io.CountingOutputStream;
import com.google.inject.Inject;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.druid.common.exception.SanitizableException;
import org.apache.druid.guice.annotations.MSQ;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Yielder;
@ -37,6 +38,7 @@ import org.apache.druid.query.QueryInterruptedException;
import org.apache.druid.query.QueryTimeoutException;
import org.apache.druid.query.QueryUnsupportedException;
import org.apache.druid.query.ResourceLimitExceededException;
import org.apache.druid.server.QueryResponse;
import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.server.security.Access;
import org.apache.druid.server.security.AuthorizationUtils;
@ -47,7 +49,6 @@ import org.apache.druid.sql.HttpStatement;
import org.apache.druid.sql.SqlPlanningException;
import org.apache.druid.sql.SqlRowTransformer;
import org.apache.druid.sql.SqlStatementFactory;
import org.apache.druid.sql.SqlStatementFactoryFactory;
import org.apache.druid.sql.http.ResultFormat;
import org.apache.druid.sql.http.SqlQuery;
import org.apache.druid.sql.http.SqlResource;
@ -91,14 +92,13 @@ public class SqlTaskResource
@Inject
public SqlTaskResource(
final MSQTaskSqlEngine engine,
final SqlStatementFactoryFactory sqlStatementFactoryFactory,
final @MSQ SqlStatementFactory sqlStatementFactory,
final ServerConfig serverConfig,
final AuthorizerMapper authorizerMapper,
final ObjectMapper jsonMapper
)
{
this.sqlStatementFactory = sqlStatementFactoryFactory.factorize(engine);
this.sqlStatementFactory = sqlStatementFactory;
this.serverConfig = serverConfig;
this.authorizerMapper = authorizerMapper;
this.jsonMapper = jsonMapper;
@ -147,7 +147,8 @@ public class SqlTaskResource
final String sqlQueryId = stmt.sqlQueryId();
try {
final DirectStatement.ResultSet plan = stmt.plan();
final Sequence<Object[]> sequence = plan.run();
final QueryResponse response = plan.run();
final Sequence sequence = response.getResults();
final SqlRowTransformer rowTransformer = plan.createRowTransformer();
final boolean isTaskStruct = MSQTaskSqlEngine.TASK_STRUCT_FIELD_NAMES.equals(rowTransformer.getFieldList());

View File

@ -35,6 +35,7 @@ import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.TypeLiteral;
import com.google.inject.util.Modules;
import com.google.inject.util.Providers;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.impl.DimensionsSpec;
@ -47,6 +48,7 @@ import org.apache.druid.guice.IndexingServiceTuningConfigModule;
import org.apache.druid.guice.JoinableFactoryModule;
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.annotations.EscalatedGlobal;
import org.apache.druid.guice.annotations.MSQ;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.hll.HyperLogLogCollector;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
@ -125,6 +127,7 @@ import org.apache.druid.server.security.AuthTestUtils;
import org.apache.druid.sql.DirectStatement;
import org.apache.druid.sql.SqlQueryPlus;
import org.apache.druid.sql.SqlStatementFactory;
import org.apache.druid.sql.SqlToolbox;
import org.apache.druid.sql.calcite.BaseCalciteQueryTest;
import org.apache.druid.sql.calcite.external.ExternalDataSource;
import org.apache.druid.sql.calcite.planner.CalciteRulesManager;
@ -341,7 +344,17 @@ public class MSQTestBase extends BaseCalciteQueryTest
new JoinableFactoryModule(),
new IndexingServiceTuningConfigModule(),
new MSQIndexingModule(),
new MSQSqlModule(),
Modules.override(new MSQSqlModule()).with(
binder -> {
// Our Guice configuration currently requires bindings to exist even if they aren't ever used, the
// following bindings are overriding other bindings that end up needing a lot more dependencies.
// We replace the bindings with something that returns null to make things more brittle in case they
// actually are used somewhere in the test.
binder.bind(SqlStatementFactory.class).annotatedWith(MSQ.class).toProvider(Providers.of(null));
binder.bind(SqlToolbox.class).toProvider(Providers.of(null));
binder.bind(MSQTaskSqlEngine.class).toProvider(Providers.of(null));
}
),
new MSQExternalDataSourceModule()
));
@ -580,7 +593,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
)
);
final List<Object[]> sequence = stmt.execute().toList();
final List<Object[]> sequence = stmt.execute().getResults().toList();
return (String) Iterables.getOnlyElement(sequence)[0];
}

View File

@ -39,7 +39,6 @@ import org.apache.druid.utils.CollectionUtils;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
@ -733,6 +732,10 @@ public abstract class ResponseContext
*/
public void merge(ResponseContext responseContext)
{
if (responseContext == null) {
return;
}
responseContext.getDelegate().forEach((key, newValue) -> {
if (newValue != null) {
add(key, newValue);

View File

@ -20,7 +20,6 @@
package org.apache.druid.query;
import com.google.common.collect.Iterables;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import org.apache.druid.collections.BlockingPool;
import org.apache.druid.collections.NonBlockingPool;
import org.apache.druid.collections.ReferenceCountingResourceHolder;
@ -31,26 +30,29 @@ import org.apache.druid.utils.CloseableUtils;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
/**
* A buffer pool that throws away buffers when they are "returned" to the pool. Useful for tests that need to make
* many pools and use them one at a time.
*
* <p>
* This pool implements {@link BlockingPool}, but never blocks. It returns immediately if resources are available;
* otherwise it returns an empty list immediately. This is also useful for tests, because it allows "timeouts" to
* happen immediately and therefore speeds up tests.
*/
public class TestBufferPool implements NonBlockingPool<ByteBuffer>, BlockingPool<ByteBuffer>
{
private final AtomicLong takeCount = new AtomicLong(0);
private final ConcurrentHashMap<Long, RuntimeException> takenFromMap = new ConcurrentHashMap<>();
private final Supplier<ResourceHolder<ByteBuffer>> generator;
private final int maxCount;
@GuardedBy("this")
private long numOutstanding;
private TestBufferPool(final Supplier<ResourceHolder<ByteBuffer>> generator, final int maxCount)
{
this.generator = generator;
@ -60,7 +62,8 @@ public class TestBufferPool implements NonBlockingPool<ByteBuffer>, BlockingPool
public static TestBufferPool onHeap(final int bufferSize, final int maxCount)
{
return new TestBufferPool(
() -> new ReferenceCountingResourceHolder<>(ByteBuffer.allocate(bufferSize), () -> {}),
() -> new ReferenceCountingResourceHolder<>(ByteBuffer.allocate(bufferSize), () -> {
}),
maxCount
);
}
@ -102,20 +105,20 @@ public class TestBufferPool implements NonBlockingPool<ByteBuffer>, BlockingPool
public List<ReferenceCountingResourceHolder<ByteBuffer>> takeBatch(int elementNum)
{
synchronized (this) {
if (numOutstanding + elementNum <= maxCount) {
if (takenFromMap.size() + elementNum <= maxCount) {
final List<ReferenceCountingResourceHolder<ByteBuffer>> retVal = new ArrayList<>();
try {
for (int i = 0; i < elementNum; i++) {
final ResourceHolder<ByteBuffer> holder = generator.get();
final ByteBuffer o = holder.get();
final long ticker = takeCount.getAndIncrement();
takenFromMap.put(ticker, new RuntimeException());
retVal.add(new ReferenceCountingResourceHolder<>(o, () -> {
synchronized (this) {
numOutstanding--;
holder.close();
}
takenFromMap.remove(ticker);
holder.close();
}));
numOutstanding++;
}
}
catch (Throwable e) {
@ -131,8 +134,11 @@ public class TestBufferPool implements NonBlockingPool<ByteBuffer>, BlockingPool
public long getOutstandingObjectCount()
{
synchronized (this) {
return numOutstanding;
}
return takenFromMap.size();
}
public Collection<RuntimeException> getOutstandingExceptionsCreated()
{
return takenFromMap.values();
}
}

View File

@ -85,7 +85,9 @@ public class TestGroupByBuffers implements Closeable
}
if (mergePool != null) {
Assert.assertEquals(0, mergePool.getOutstandingObjectCount());
if (mergePool.getOutstandingObjectCount() != 0) {
throw mergePool.getOutstandingExceptionsCreated().iterator().next();
}
mergePool = null;
}
}

View File

@ -141,7 +141,7 @@ public class QueryLifecycle
* @return results
*/
@SuppressWarnings("unchecked")
public <T> Sequence<T> runSimple(
public <T> QueryResponse runSimple(
final Query<T> query,
final AuthenticationResult authenticationResult,
final Access authorizationResult
@ -151,13 +151,14 @@ public class QueryLifecycle
final Sequence<T> results;
final QueryResponse queryResponse;
try {
preAuthorized(authenticationResult, authorizationResult);
if (!authorizationResult.isAllowed()) {
throw new ISE("Unauthorized");
}
final QueryResponse queryResponse = execute();
queryResponse = execute();
results = queryResponse.getResults();
}
catch (Throwable e) {
@ -165,16 +166,25 @@ public class QueryLifecycle
throw e;
}
return Sequences.wrap(
results,
new SequenceWrapper()
{
@Override
public void after(final boolean isDone, final Throwable thrown)
{
emitLogsAndMetrics(thrown, null, -1);
}
}
/*
* It seems extremely weird that the below code is wrapping the Sequence in order to emitLogsAndMetrics.
* The Sequence was returned by the call to execute, it would be worthwile to figure out why this wrapping
* cannot be moved into execute(). We leave this as an exercise for the future, however as this oddity
* was discovered while just trying to expose HTTP response headers
*/
return new QueryResponse(
Sequences.wrap(
results,
new SequenceWrapper()
{
@Override
public void after(final boolean isDone, final Throwable thrown)
{
emitLogsAndMetrics(thrown, null, -1);
}
}
),
queryResponse.getResponseContext()
);
}
@ -439,25 +449,4 @@ public class QueryLifecycle
DONE
}
public static class QueryResponse
{
private final Sequence results;
private final ResponseContext responseContext;
private QueryResponse(final Sequence results, final ResponseContext responseContext)
{
this.results = results;
this.responseContext = responseContext;
}
public Sequence getResults()
{
return results;
}
public ResponseContext getResponseContext()
{
return responseContext;
}
}
}

View File

@ -200,7 +200,7 @@ public class QueryResource implements QueryCountStatsProvider
throw new ForbiddenException(authResult.toString());
}
final QueryLifecycle.QueryResponse queryResponse = queryLifecycle.execute();
final QueryResponse queryResponse = queryLifecycle.execute();
final Sequence<?> results = queryResponse.getResults();
final ResponseContext responseContext = queryResponse.getResponseContext();
final String prevEtag = getPreviousEtag(req);
@ -255,41 +255,11 @@ public class QueryResource implements QueryCountStatsProvider
)
.header(QUERY_ID_RESPONSE_HEADER, queryId);
transferEntityTag(responseContext, responseBuilder);
DirectDruidClient.removeMagicResponseContextFields(responseContext);
// Limit the response-context header, see https://github.com/apache/druid/issues/2331
// Note that Response.ResponseBuilder.header(String key,Object value).build() calls value.toString()
// and encodes the string using ASCII, so 1 char is = 1 byte
final ResponseContext.SerializationResult serializationResult = responseContext.serializeWith(
jsonMapper,
responseContextConfig.getMaxResponseContextHeaderSize()
attachResponseContextToHttpResponse(queryId, responseContext, responseBuilder, jsonMapper,
responseContextConfig, selfNode
);
if (serializationResult.isTruncated()) {
final String logToPrint = StringUtils.format(
"Response Context truncated for id [%s]. Full context is [%s].",
queryId,
serializationResult.getFullResult()
);
if (responseContextConfig.shouldFailOnTruncatedResponseContext()) {
log.error(logToPrint);
throw new QueryInterruptedException(
new TruncatedResponseContextException(
"Serialized response context exceeds the max size[%s]",
responseContextConfig.getMaxResponseContextHeaderSize()
),
selfNode.getHostAndPortToUse()
);
} else {
log.warn(logToPrint);
}
}
return responseBuilder
.header(HEADER_RESPONSE_CONTEXT, serializationResult.getResult())
.build();
return responseBuilder.build();
}
catch (QueryException e) {
// make sure to close yielder if anything happened before starting to serialize the response.
@ -358,6 +328,48 @@ public class QueryResource implements QueryCountStatsProvider
}
}
public static void attachResponseContextToHttpResponse(
String queryId,
ResponseContext responseContext,
Response.ResponseBuilder responseBuilder,
ObjectMapper jsonMapper, ResponseContextConfig responseContextConfig, DruidNode selfNode
) throws JsonProcessingException
{
transferEntityTag(responseContext, responseBuilder);
DirectDruidClient.removeMagicResponseContextFields(responseContext);
// Limit the response-context header, see https://github.com/apache/druid/issues/2331
// Note that Response.ResponseBuilder.header(String key,Object value).build() calls value.toString()
// and encodes the string using ASCII, so 1 char is = 1 byte
final ResponseContext.SerializationResult serializationResult = responseContext.serializeWith(
jsonMapper,
responseContextConfig.getMaxResponseContextHeaderSize()
);
if (serializationResult.isTruncated()) {
final String logToPrint = StringUtils.format(
"Response Context truncated for id [%s]. Full context is [%s].",
queryId,
serializationResult.getFullResult()
);
if (responseContextConfig.shouldFailOnTruncatedResponseContext()) {
log.error(logToPrint);
throw new QueryInterruptedException(
new TruncatedResponseContextException(
"Serialized response context exceeds the max size[%s]",
responseContextConfig.getMaxResponseContextHeaderSize()
),
selfNode.getHostAndPortToUse()
);
} else {
log.warn(logToPrint);
}
}
responseBuilder.header(HEADER_RESPONSE_CONTEXT, serializationResult.getResult());
}
private Query<?> readQuery(
final HttpServletRequest req,
final InputStream in,

View File

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.query.context.ResponseContext;
public class QueryResponse
{
public static QueryResponse withEmptyContext(Sequence results)
{
return new QueryResponse(results, ResponseContext.createEmpty());
}
private final Sequence results;
private final ResponseContext responseContext;
public QueryResponse(final Sequence results, final ResponseContext responseContext)
{
this.results = results;
this.responseContext = responseContext;
}
public Sequence getResults()
{
return results;
}
public ResponseContext getResponseContext()
{
return responseContext;
}
}

View File

@ -23,12 +23,10 @@ import com.google.common.base.Supplier;
import com.google.common.collect.Maps;
import com.google.inject.Binder;
import com.google.inject.Inject;
import com.google.inject.Injector;
import com.google.inject.Provider;
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.JsonConfigurator;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.LifecycleModule;
import org.apache.druid.initialization.DruidModule;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
@ -54,22 +52,18 @@ public class AuthenticatorMapperModule implements DruidModule
binder.bind(AuthenticatorMapper.class)
.toProvider(new AuthenticatorMapperProvider())
.in(LazySingleton.class);
LifecycleModule.register(binder, AuthenticatorMapper.class);
}
private static class AuthenticatorMapperProvider implements Provider<AuthenticatorMapper>
{
private AuthConfig authConfig;
private Injector injector;
private Properties props;
private JsonConfigurator configurator;
@Inject
public void inject(Injector injector, Properties props, JsonConfigurator configurator)
public void inject(AuthConfig authConfig, Properties props, JsonConfigurator configurator)
{
this.authConfig = injector.getInstance(AuthConfig.class);
this.injector = injector;
this.authConfig = authConfig;
this.props = props;
this.configurator = configurator;
}
@ -91,7 +85,10 @@ public class AuthenticatorMapperModule implements DruidModule
}
for (String authenticatorName : authenticators) {
final String authenticatorPropertyBase = StringUtils.format(AUTHENTICATOR_PROPERTIES_FORMAT_STRING, authenticatorName);
final String authenticatorPropertyBase = StringUtils.format(
AUTHENTICATOR_PROPERTIES_FORMAT_STRING,
authenticatorName
);
final JsonConfigProvider<Authenticator> authenticatorProvider = JsonConfigProvider.of(
authenticatorPropertyBase,
Authenticator.class

View File

@ -20,12 +20,10 @@
package org.apache.druid.server.security;
import com.google.common.collect.Lists;
import org.apache.druid.guice.ManageLifecycle;
import java.util.List;
import java.util.Map;
@ManageLifecycle
public class AuthenticatorMapper
{
private Map<String, Authenticator> authenticatorMap;

View File

@ -19,11 +19,8 @@
package org.apache.druid.server.security;
import org.apache.druid.guice.ManageLifecycle;
import java.util.Map;
@ManageLifecycle
public class AuthorizerMapper
{
private Map<String, Authorizer> authorizerMap;

View File

@ -208,6 +208,11 @@
<artifactId>easymock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-core</artifactId>

View File

@ -23,9 +23,9 @@ import com.google.common.annotations.VisibleForTesting;
import org.apache.calcite.tools.ValidationException;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.QueryInterruptedException;
import org.apache.druid.server.QueryResponse;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.sql.SqlLifecycleManager.Cancelable;
import org.apache.druid.sql.calcite.planner.DruidPlanner;
@ -98,7 +98,7 @@ public class DirectStatement extends AbstractStatement implements Cancelable
* Do the actual execute step which allows subclasses to wrap the sequence,
* as is sometimes needed for testing.
*/
public Sequence<Object[]> run()
public QueryResponse run()
{
try {
// Check cancellation. Required for SqlResourceTest to work.
@ -176,7 +176,7 @@ public class DirectStatement extends AbstractStatement implements Cancelable
*
* @return sequence which delivers query results
*/
public Sequence<Object[]> execute()
public QueryResponse execute()
{
return plan().run();
}

View File

@ -23,11 +23,41 @@ import org.apache.druid.sql.http.SqlQuery;
import javax.servlet.http.HttpServletRequest;
public interface SqlStatementFactory
/**
* A class for the creation of Statements, which happen to be used for Sql.
*/
public class SqlStatementFactory
{
HttpStatement httpStatement(SqlQuery sqlQuery, HttpServletRequest req);
private final SqlToolbox lifecycleToolbox;
DirectStatement directStatement(SqlQueryPlus sqlRequest);
/**
* The construction of these objects in the production code is a bit circuitous. Specifically, the SqlToolbox
* looks like it can be normally injected, except it actually expects to be mutated with a SqlEngine before being
* injected. This is generally accomplished with Guice, examples of which can be seen in the
* SqlStatementFactoryModule.
*
* @param lifecycleToolbox
*/
public SqlStatementFactory(SqlToolbox lifecycleToolbox)
{
this.lifecycleToolbox = lifecycleToolbox;
}
PreparedStatement preparedStatement(SqlQueryPlus sqlRequest);
public HttpStatement httpStatement(
final SqlQuery sqlQuery,
final HttpServletRequest req
)
{
return new HttpStatement(lifecycleToolbox, sqlQuery, req);
}
public DirectStatement directStatement(final SqlQueryPlus sqlRequest)
{
return new DirectStatement(lifecycleToolbox, sqlRequest);
}
public PreparedStatement preparedStatement(final SqlQueryPlus sqlRequest)
{
return new PreparedStatement(lifecycleToolbox, sqlRequest);
}
}

View File

@ -1,105 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql;
import com.google.common.base.Supplier;
import com.google.inject.Inject;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.DefaultQueryConfig;
import org.apache.druid.server.QueryScheduler;
import org.apache.druid.server.log.RequestLogger;
import org.apache.druid.server.security.AuthConfig;
import org.apache.druid.sql.calcite.planner.PlannerFactory;
import org.apache.druid.sql.calcite.run.SqlEngine;
import org.apache.druid.sql.http.SqlQuery;
import javax.servlet.http.HttpServletRequest;
/**
* Factory factories: when design patterns go too far.
*
* Almost everything we need to create a {@link SqlStatementFactory} is injectable, except for the {@link SqlEngine}.
* So this class exists to produce {@link SqlStatementFactory} once the engine for a query is known.
*/
@LazySingleton
public class SqlStatementFactoryFactory
{
protected final SqlToolbox lifecycleToolbox;
@Inject
public SqlStatementFactoryFactory(
final PlannerFactory plannerFactory,
final ServiceEmitter emitter,
final RequestLogger requestLogger,
final QueryScheduler queryScheduler,
final AuthConfig authConfig,
final Supplier<DefaultQueryConfig> defaultQueryConfig,
final SqlLifecycleManager sqlLifecycleManager
)
{
this.lifecycleToolbox = new SqlToolbox(
null,
plannerFactory,
emitter,
requestLogger,
queryScheduler,
authConfig,
defaultQueryConfig.get(),
sqlLifecycleManager
);
}
public SqlStatementFactory factorize(final SqlEngine engine)
{
return new FactoryImpl(lifecycleToolbox.withEngine(engine));
}
private static class FactoryImpl implements SqlStatementFactory
{
private final SqlToolbox lifecycleToolbox;
public FactoryImpl(SqlToolbox lifecycleToolbox)
{
this.lifecycleToolbox = lifecycleToolbox;
}
@Override
public HttpStatement httpStatement(
final SqlQuery sqlQuery,
final HttpServletRequest req
)
{
return new HttpStatement(lifecycleToolbox, sqlQuery, req);
}
@Override
public DirectStatement directStatement(final SqlQueryPlus sqlRequest)
{
return new DirectStatement(lifecycleToolbox, sqlRequest);
}
@Override
public PreparedStatement preparedStatement(final SqlQueryPlus sqlRequest)
{
return new PreparedStatement(lifecycleToolbox, sqlRequest);
}
}
}

View File

@ -103,7 +103,7 @@ public class DruidJdbcResultSet implements Closeable
ensure(State.NEW);
try {
state = State.RUNNING;
final Sequence<Object[]> baseSequence = yielderOpenCloseExecutor.submit(stmt::execute).get();
final Sequence<Object[]> baseSequence = yielderOpenCloseExecutor.submit(stmt::execute).get().getResults();
// We can't apply limits greater than Integer.MAX_VALUE, ignore them.
final Sequence<Object[]> retSequence =

View File

@ -27,7 +27,6 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Inject;
import com.google.inject.Injector;
import org.apache.calcite.avatica.AvaticaSeverity;
import org.apache.calcite.avatica.MetaImpl;
import org.apache.calcite.avatica.MissingResultsException;
@ -38,6 +37,7 @@ import org.apache.calcite.avatica.remote.AvaticaRuntimeException;
import org.apache.calcite.avatica.remote.Service.ErrorResponse;
import org.apache.calcite.avatica.remote.TypedValue;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.annotations.NativeQuery;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.UOE;
@ -49,10 +49,8 @@ import org.apache.druid.server.security.AuthenticatorMapper;
import org.apache.druid.server.security.ForbiddenException;
import org.apache.druid.sql.SqlQueryPlus;
import org.apache.druid.sql.SqlStatementFactory;
import org.apache.druid.sql.SqlStatementFactoryFactory;
import org.apache.druid.sql.calcite.planner.Calcites;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.run.NativeSqlEngine;
import org.joda.time.Interval;
import javax.annotation.Nonnull;
@ -125,15 +123,14 @@ public class DruidMeta extends MetaImpl
@Inject
public DruidMeta(
final NativeSqlEngine engine,
final SqlStatementFactoryFactory sqlStatementFactoryFactory,
final @NativeQuery SqlStatementFactory sqlStatementFactory,
final AvaticaServerConfig config,
final ErrorHandler errorHandler,
final Injector injector
final AuthenticatorMapper authMapper
)
{
this(
sqlStatementFactoryFactory.factorize(engine),
sqlStatementFactory,
config,
errorHandler,
Executors.newSingleThreadScheduledExecutor(
@ -142,7 +139,7 @@ public class DruidMeta extends MetaImpl
.setDaemon(true)
.build()
),
injector.getInstance(AuthenticatorMapper.class).getAuthenticatorChain()
authMapper.getAuthenticatorChain()
);
}

View File

@ -68,12 +68,12 @@ import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.BaseSequence;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.query.Query;
import org.apache.druid.segment.DimensionHandlerUtils;
import org.apache.druid.server.QueryResponse;
import org.apache.druid.server.security.Access;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.Resource;
@ -462,7 +462,7 @@ public class DruidPlanner implements Closeable
}
// Start the query.
final Supplier<Sequence<Object[]>> resultsSupplier = () -> {
final Supplier<QueryResponse> resultsSupplier = () -> {
// sanity check
final Set<ResourceAction> readResourceActions =
plannerContext.getResourceActions()
@ -536,38 +536,40 @@ public class DruidPlanner implements Closeable
planner.getTypeFactory(),
plannerContext.getParameters()
);
final Supplier<Sequence<Object[]>> resultsSupplier = () -> {
final Supplier<QueryResponse> resultsSupplier = () -> {
final Enumerable<?> enumerable = theRel.bind(dataContext);
final Enumerator<?> enumerator = enumerable.enumerator();
return Sequences.withBaggage(new BaseSequence<>(
new BaseSequence.IteratorMaker<Object[], EnumeratorIterator<Object[]>>()
{
@Override
public EnumeratorIterator<Object[]> make()
{
return new EnumeratorIterator<>(new Iterator<Object[]>()
return QueryResponse.withEmptyContext(Sequences.withBaggage(
new BaseSequence<>(
new BaseSequence.IteratorMaker<Object[], EnumeratorIterator<Object[]>>()
{
@Override
public boolean hasNext()
public EnumeratorIterator<Object[]> make()
{
return enumerator.moveNext();
return new EnumeratorIterator<>(new Iterator<Object[]>()
{
@Override
public boolean hasNext()
{
return enumerator.moveNext();
}
@Override
public Object[] next()
{
return (Object[]) enumerator.current();
}
});
}
@Override
public Object[] next()
public void cleanup(EnumeratorIterator<Object[]> iterFromMake)
{
return (Object[]) enumerator.current();
}
});
}
@Override
public void cleanup(EnumeratorIterator<Object[]> iterFromMake)
{
}
}
), enumerator::close);
}
), enumerator::close)
);
};
return new PlannerResult(resultsSupplier, root.validatedRowType);
}
@ -606,8 +608,9 @@ public class DruidPlanner implements Closeable
log.error(jpe, "Encountered exception while serializing Resources for explain output");
resourcesString = null;
}
final Supplier<Sequence<Object[]>> resultsSupplier = Suppliers.ofInstance(
Sequences.simple(ImmutableList.of(new Object[]{explanation, resourcesString})));
final Supplier<QueryResponse> resultsSupplier = Suppliers.ofInstance(
QueryResponse.withEmptyContext(Sequences.simple(ImmutableList.of(new Object[]{explanation, resourcesString})))
);
return new PlannerResult(resultsSupplier, getExplainStructType(rel.getCluster().getTypeFactory()));
}

View File

@ -22,7 +22,7 @@ package org.apache.druid.sql.calcite.planner;
import com.google.common.base.Supplier;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.server.QueryResponse;
import java.util.concurrent.atomic.AtomicBoolean;
@ -32,12 +32,12 @@ import java.util.concurrent.atomic.AtomicBoolean;
*/
public class PlannerResult
{
private final Supplier<Sequence<Object[]>> resultsSupplier;
private final Supplier<QueryResponse> resultsSupplier;
private final RelDataType rowType;
private final AtomicBoolean didRun = new AtomicBoolean();
public PlannerResult(
final Supplier<Sequence<Object[]>> resultsSupplier,
final Supplier<QueryResponse> resultsSupplier,
final RelDataType rowType
)
{
@ -53,7 +53,7 @@ public class PlannerResult
/**
* Run the query
*/
public Sequence<Object[]> run()
public QueryResponse run()
{
if (!didRun.compareAndSet(false, true)) {
// Safety check.

View File

@ -22,7 +22,7 @@ package org.apache.druid.sql.calcite.rel;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.AbstractRelNode;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.server.QueryResponse;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import javax.annotation.Nullable;
@ -45,7 +45,7 @@ public abstract class DruidRel<T extends DruidRel> extends AbstractRelNode
@Nullable
public abstract PartialDruidQuery getPartialDruidQuery();
public Sequence<Object[]> runQuery()
public QueryResponse runQuery()
{
// runQuery doesn't need to finalize aggregations, because the fact that runQuery is happening suggests this
// is the outermost query, and it will actually get run as a native query. Druid's native query layer will

View File

@ -21,6 +21,7 @@ package org.apache.druid.sql.calcite.rel;
import com.google.common.base.Preconditions;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.Iterables;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptCost;
import org.apache.calcite.plan.RelOptPlanner;
@ -30,14 +31,19 @@ import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelWriter;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.UnionDataSource;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.server.QueryResponse;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
@ -47,7 +53,7 @@ import java.util.stream.Collectors;
* but rather, it represents the concatenation of a series of native queries in the SQL layer. Therefore,
* {@link #getPartialDruidQuery()} returns null, and this rel cannot be built on top of. It must be the outer rel in a
* query plan.
*
* <p>
* See {@link DruidUnionDataSourceRel} for a version that does a regular Druid query using a {@link UnionDataSource}.
* In the future we expect that {@link UnionDataSource} will gain the ability to union query datasources together, and
* then this rel could be replaced by {@link DruidUnionDataSourceRel}.
@ -100,18 +106,51 @@ public class DruidUnionRel extends DruidRel<DruidUnionRel>
}
@Override
@SuppressWarnings("unchecked")
public Sequence<Object[]> runQuery()
@SuppressWarnings({"unchecked", "rawtypes"})
public QueryResponse runQuery()
{
// Lazy: run each query in sequence, not all at once.
if (limit == 0) {
return Sequences.empty();
return new QueryResponse(Sequences.empty(), ResponseContext.createEmpty());
} else {
final Sequence baseSequence = Sequences.concat(
FluentIterable.from(rels).transform(rel -> ((DruidRel) rel).runQuery())
);
return limit > 0 ? baseSequence.limit(limit) : baseSequence;
// We run the first rel here for two reasons:
// 1) So that we get things running as normally expected when runQuery() is called
// 2) So that we have a QueryResponse to return, note that the response headers from the query will only
// have values from this first query and will not contain values from subsequent queries. This is definitely
// sub-optimal, the other option would be to fire off all queries and combine their QueryResponses, but that
// is also sub-optimal as it would consume parallel query resources and potentially starve the system.
// Instead, we only return the headers from the first query and potentially exception out and fail the query
// if there are any response headers that come from subsequent queries that are correctness concerns
final QueryResponse queryResponse = ((DruidRel) rels.get(0)).runQuery();
final List<Sequence<Object>> firstAsList = Collections.singletonList(queryResponse.getResults());
final Iterable<Sequence<Object>> theRestTransformed = FluentIterable
.from(rels.subList(1, rels.size()))
.transform(
rel -> {
final QueryResponse response = ((DruidRel) rel).runQuery();
final ResponseContext nextContext = response.getResponseContext();
final List<Interval> uncoveredIntervals = nextContext.getUncoveredIntervals();
if (uncoveredIntervals == null || uncoveredIntervals.isEmpty()) {
return response.getResults();
} else {
throw new ISE(
"uncoveredIntervals[%s] existed on a sub-query of a union, incomplete data, failing",
uncoveredIntervals
);
}
}
);
final Iterable<Sequence<Object>> recombinedSequences = Iterables.concat(firstAsList, theRestTransformed);
final Sequence returnSequence = Sequences.concat(recombinedSequences);
return new QueryResponse(
limit > 0 ? returnSequence.limit(limit) : returnSequence,
queryResponse.getResponseContext()
);
}
}

View File

@ -54,6 +54,7 @@ import org.apache.druid.segment.data.ComparableList;
import org.apache.druid.segment.data.ComparableStringArray;
import org.apache.druid.server.QueryLifecycle;
import org.apache.druid.server.QueryLifecycleFactory;
import org.apache.druid.server.QueryResponse;
import org.apache.druid.server.security.Access;
import org.apache.druid.server.security.AuthenticationResult;
import org.apache.druid.sql.calcite.planner.Calcites;
@ -93,7 +94,7 @@ public class NativeQueryMaker implements QueryMaker
}
@Override
public Sequence<Object[]> runQuery(final DruidQuery druidQuery)
public QueryResponse runQuery(final DruidQuery druidQuery)
{
final Query<?> query = druidQuery.getQuery();
@ -172,7 +173,7 @@ public class NativeQueryMaker implements QueryMaker
.orElseGet(query::getIntervals);
}
private <T> Sequence<Object[]> execute(Query<T> query, final List<String> newFields, final List<SqlTypeName> newTypes)
private <T> QueryResponse execute(Query<T> query, final List<String> newFields, final List<SqlTypeName> newTypes)
{
Hook.QUERY_PLAN.run(query);
@ -194,23 +195,22 @@ public class NativeQueryMaker implements QueryMaker
// otherwise it won't yet be initialized. (A bummer, since ideally, we'd verify the toolChest exists and can do
// array-based results before starting the query; but in practice we don't expect this to happen since we keep
// tight control over which query types we generate in the SQL layer. They all support array-based results.)
final Sequence<T> results = queryLifecycle.runSimple(query, authenticationResult, authorizationResult);
final QueryResponse results = queryLifecycle.runSimple(query, authenticationResult, authorizationResult);
//noinspection unchecked
final QueryToolChest<T, Query<T>> toolChest = queryLifecycle.getToolChest();
final List<String> resultArrayFields = toolChest.resultArraySignature(query).getColumnNames();
final Sequence<Object[]> resultArrays = toolChest.resultsAsArrays(query, results);
return mapResultSequence(resultArrays, resultArrayFields, newFields, newTypes);
return mapResultSequence(results, queryLifecycle.getToolChest(), query, newFields, newTypes);
}
private Sequence<Object[]> mapResultSequence(
final Sequence<Object[]> sequence,
final List<String> originalFields,
private <T> QueryResponse mapResultSequence(
final QueryResponse results,
final QueryToolChest<T, Query<T>> toolChest,
final Query<T> query,
final List<String> newFields,
final List<SqlTypeName> newTypes
)
{
final List<String> originalFields = toolChest.resultArraySignature(query).getColumnNames();
// Build hash map for looking up original field positions, in case the number of fields is super high.
final Object2IntMap<String> originalFieldsLookup = new Object2IntOpenHashMap<>();
originalFieldsLookup.defaultReturnValue(-1);
@ -234,15 +234,20 @@ public class NativeQueryMaker implements QueryMaker
mapping[i] = idx;
}
return Sequences.map(
sequence,
array -> {
final Object[] newArray = new Object[mapping.length];
for (int i = 0; i < mapping.length; i++) {
newArray[i] = coerce(array[mapping[i]], newTypes.get(i));
}
return newArray;
}
//noinspection unchecked
final Sequence<Object[]> sequence = toolChest.resultsAsArrays(query, results.getResults());
return new QueryResponse(
Sequences.map(
sequence,
array -> {
final Object[] newArray = new Object[mapping.length];
for (int i = 0; i < mapping.length; i++) {
newArray[i] = coerce(array[mapping[i]], newTypes.get(i));
}
return newArray;
}
),
results.getResponseContext()
);
}

View File

@ -19,7 +19,7 @@
package org.apache.druid.sql.calcite.run;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.server.QueryResponse;
import org.apache.druid.sql.calcite.rel.DruidQuery;
/**
@ -33,5 +33,5 @@ public interface QueryMaker
* created for. The returned arrays match the row type given by {@link SqlEngine#resultTypeForSelect} or
* {@link SqlEngine#resultTypeForInsert}, depending on the nature of the statement.
*/
Sequence<Object[]> runQuery(DruidQuery druidQuery);
QueryResponse runQuery(DruidQuery druidQuery);
}

View File

@ -66,7 +66,6 @@ import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Comparator;
import java.util.EnumSet;
@ -907,7 +906,7 @@ public class SegmentMetadataCache
return queryLifecycleFactory
.factorize()
.runSimple(segmentMetadataQuery, escalator.createEscalatedAuthenticationResult(), Access.OK);
.runSimple(segmentMetadataQuery, escalator.createEscalatedAuthenticationResult(), Access.OK).getResults();
}
@VisibleForTesting

View File

@ -19,18 +19,30 @@
package org.apache.druid.sql.guice;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.inject.Binder;
import com.google.inject.Inject;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.Provides;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.PolyBind;
import org.apache.druid.guice.annotations.NativeQuery;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.DefaultQueryConfig;
import org.apache.druid.server.QueryScheduler;
import org.apache.druid.server.log.RequestLogger;
import org.apache.druid.server.security.AuthConfig;
import org.apache.druid.sql.SqlLifecycleManager;
import org.apache.druid.sql.SqlStatementFactory;
import org.apache.druid.sql.SqlToolbox;
import org.apache.druid.sql.avatica.AvaticaModule;
import org.apache.druid.sql.calcite.aggregation.SqlAggregationModule;
import org.apache.druid.sql.calcite.expression.builtin.QueryLookupOperatorConversion;
import org.apache.druid.sql.calcite.planner.CalcitePlannerModule;
import org.apache.druid.sql.calcite.planner.PlannerFactory;
import org.apache.druid.sql.calcite.run.NativeSqlEngine;
import org.apache.druid.sql.calcite.schema.DruidCalciteSchemaModule;
import org.apache.druid.sql.calcite.schema.DruidSchemaManager;
import org.apache.druid.sql.calcite.schema.NoopDruidSchemaManager;
@ -50,17 +62,10 @@ public class SqlModule implements Module
public static final String PROPERTY_SQL_SCHEMA_MANAGER_TYPE = "druid.sql.schemamanager.type";
public static final String PROPERTY_SQL_APPROX_COUNT_DISTINCT_CHOICE = "druid.sql.approxCountDistinct.function";
@Inject
private Properties props;
public SqlModule()
{
}
@VisibleForTesting
public SqlModule(
Properties props
)
@Inject
public void setProps(Properties props)
{
this.props = props;
}
@ -101,6 +106,8 @@ public class SqlModule implements Module
binder.install(new SqlAggregationModule());
binder.install(new DruidViewModule());
binder.install(new SqlStatementFactoryModule());
// QueryLookupOperatorConversion isn't in DruidOperatorTable since it needs a LookupExtractorFactoryContainerProvider injected.
SqlBindings.addOperatorConversion(binder, QueryLookupOperatorConversion.class);
@ -130,4 +137,59 @@ public class SqlModule implements Module
Preconditions.checkNotNull(props, "props");
return Boolean.valueOf(props.getProperty(PROPERTY_SQL_ENABLE_AVATICA, "true"));
}
/**
* We create a new class for this module so that it can be shared by tests. The structuring of the SqlModule
* at time of writing was not conducive to reuse in test code, so, instead of fixing that we just take the easy
* way out of adding the test-reusable code to this module and reuse that.
*
* Generally speaking, the injection pattern done by this module is a bit circuitous. The `SqlToolbox` acts as
* if it can be injected with all of its dependencies, but also expects to be mutated with a new SqlEngine. We
* should likely look at adjusting the object dependencies to actually depend on the SqlToolbox and create
* different Toolboxes for the different way that queries are done. But, for now, I'm not changing the interfaces.
*/
public static class SqlStatementFactoryModule implements Module
{
@Provides
@LazySingleton
public SqlToolbox makeSqlToolbox(
final PlannerFactory plannerFactory,
final ServiceEmitter emitter,
final RequestLogger requestLogger,
final QueryScheduler queryScheduler,
final AuthConfig authConfig,
final Supplier<DefaultQueryConfig> defaultQueryConfig,
final SqlLifecycleManager sqlLifecycleManager
)
{
return new SqlToolbox(
null,
plannerFactory,
emitter,
requestLogger,
queryScheduler,
authConfig,
defaultQueryConfig.get(),
sqlLifecycleManager
);
}
@Provides
@NativeQuery
@LazySingleton
public SqlStatementFactory makeNativeSqlStatementFactory(
final NativeSqlEngine sqlEngine,
SqlToolbox toolbox
)
{
return new SqlStatementFactory(toolbox.withEngine(sqlEngine));
}
@Override
public void configure(Binder binder)
{
// Do nothing, this class exists for the Provider methods
}
}
}

View File

@ -22,6 +22,7 @@ package org.apache.druid.sql.http;
import com.google.inject.Binder;
import com.google.inject.Module;
import org.apache.druid.guice.Jerseys;
import org.apache.druid.guice.LazySingleton;
/**
* The Module responsible for providing bindings to the SQL http endpoint
@ -31,6 +32,7 @@ public class SqlHttpModule implements Module
@Override
public void configure(Binder binder)
{
binder.bind(SqlResource.class).in(LazySingleton.class);
Jerseys.addResource(binder, SqlResource.class);
}
}

View File

@ -21,15 +21,14 @@ package org.apache.druid.sql.http;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.io.CountingOutputStream;
import com.google.inject.Inject;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.druid.common.exception.SanitizableException;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.guice.annotations.NativeQuery;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Yielder;
import org.apache.druid.java.util.common.guava.Yielders;
import org.apache.druid.java.util.common.logger.Logger;
@ -38,6 +37,10 @@ import org.apache.druid.query.QueryCapacityExceededException;
import org.apache.druid.query.QueryInterruptedException;
import org.apache.druid.query.QueryTimeoutException;
import org.apache.druid.query.QueryUnsupportedException;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.QueryResource;
import org.apache.druid.server.QueryResponse;
import org.apache.druid.server.ResponseContextConfig;
import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.server.security.Access;
import org.apache.druid.server.security.AuthorizationUtils;
@ -52,8 +55,6 @@ import org.apache.druid.sql.SqlLifecycleManager.Cancelable;
import org.apache.druid.sql.SqlPlanningException;
import org.apache.druid.sql.SqlRowTransformer;
import org.apache.druid.sql.SqlStatementFactory;
import org.apache.druid.sql.SqlStatementFactoryFactory;
import org.apache.druid.sql.calcite.run.NativeSqlEngine;
import javax.annotation.Nullable;
import javax.servlet.http.HttpServletRequest;
@ -63,13 +64,14 @@ import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.StreamingOutput;
import java.io.IOException;
import java.io.OutputStream;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
@ -87,33 +89,18 @@ public class SqlResource
private final SqlStatementFactory sqlStatementFactory;
private final SqlLifecycleManager sqlLifecycleManager;
private final ServerConfig serverConfig;
private final ResponseContextConfig responseContextConfig;
private final DruidNode selfNode;
@Inject
public SqlResource(
@Json ObjectMapper jsonMapper,
AuthorizerMapper authorizerMapper,
NativeSqlEngine engine,
SqlStatementFactoryFactory sqlStatementFactoryFactory,
SqlLifecycleManager sqlLifecycleManager,
ServerConfig serverConfig
)
{
this(
jsonMapper,
authorizerMapper,
sqlStatementFactoryFactory.factorize(engine),
sqlLifecycleManager,
serverConfig
);
}
@VisibleForTesting
SqlResource(
final ObjectMapper jsonMapper,
final AuthorizerMapper authorizerMapper,
final SqlStatementFactory sqlStatementFactory,
final @NativeQuery SqlStatementFactory sqlStatementFactory,
final SqlLifecycleManager sqlLifecycleManager,
final ServerConfig serverConfig
final ServerConfig serverConfig,
ResponseContextConfig responseContextConfig,
@Self DruidNode selfNode
)
{
this.jsonMapper = Preconditions.checkNotNull(jsonMapper, "jsonMapper");
@ -121,6 +108,9 @@ public class SqlResource
this.sqlStatementFactory = Preconditions.checkNotNull(sqlStatementFactory, "sqlStatementFactory");
this.sqlLifecycleManager = Preconditions.checkNotNull(sqlLifecycleManager, "sqlLifecycleManager");
this.serverConfig = Preconditions.checkNotNull(serverConfig, "serverConfig");
this.responseContextConfig = responseContextConfig;
this.selfNode = selfNode;
}
@POST
@ -138,17 +128,20 @@ public class SqlResource
try {
Thread.currentThread().setName(StringUtils.format("sql[%s]", sqlQueryId));
ResultSet resultSet = stmt.plan();
final Sequence<Object[]> sequence = resultSet.run();
final QueryResponse response = resultSet.run();
final SqlRowTransformer rowTransformer = resultSet.createRowTransformer();
final Yielder<Object[]> yielder0 = Yielders.each(sequence);
final Yielder<Object[]> finalYielder = Yielders.each(response.getResults());
try {
final Response.ResponseBuilder responseBuilder = Response
.ok(
(StreamingOutput) outputStream -> {
final Response.ResponseBuilder responseBuilder = Response
.ok(
new StreamingOutput()
{
@Override
public void write(OutputStream output) throws IOException, WebApplicationException
{
Exception e = null;
CountingOutputStream os = new CountingOutputStream(outputStream);
Yielder<Object[]> yielder = yielder0;
CountingOutputStream os = new CountingOutputStream(output);
Yielder<Object[]> yielder = finalYielder;
try (final ResultFormat.Writer writer = sqlQuery.getResultFormat()
.createFormatter(os, jsonMapper)) {
@ -185,20 +178,24 @@ public class SqlResource
endLifecycle(stmt, e, os.getCount());
}
}
)
.header(SQL_QUERY_ID_RESPONSE_HEADER, sqlQueryId);
}
)
.header(SQL_QUERY_ID_RESPONSE_HEADER, sqlQueryId);
if (sqlQuery.includeHeader()) {
responseBuilder.header(SQL_HEADER_RESPONSE_HEADER, SQL_HEADER_VALUE);
}
if (sqlQuery.includeHeader()) {
responseBuilder.header(SQL_HEADER_RESPONSE_HEADER, SQL_HEADER_VALUE);
}
return responseBuilder.build();
}
catch (Throwable e) {
// make sure to close yielder if anything happened before starting to serialize the response.
yielder0.close();
throw new RuntimeException(e);
}
QueryResource.attachResponseContextToHttpResponse(
sqlQueryId,
response.getResponseContext(),
responseBuilder,
jsonMapper,
responseContextConfig,
selfNode
);
return responseBuilder.build();
}
catch (QueryCapacityExceededException cap) {
endLifecycle(stmt, cap, -1);

View File

@ -19,7 +19,6 @@
package org.apache.druid.sql;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ListeningExecutorService;
@ -70,7 +69,6 @@ import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import javax.servlet.http.HttpServletRequest;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
@ -159,15 +157,18 @@ public class SqlStatementTest
new CalciteRulesManager(ImmutableSet.of())
);
this.sqlStatementFactory = new SqlStatementFactoryFactory(
plannerFactory,
new NoopServiceEmitter(),
testRequestLogger,
QueryStackTests.DEFAULT_NOOP_SCHEDULER,
new AuthConfig(),
Suppliers.ofInstance(defaultQueryConfig),
new SqlLifecycleManager()
).factorize(CalciteTests.createMockSqlEngine(walker, conglomerate));
this.sqlStatementFactory = new SqlStatementFactory(
new SqlToolbox(
CalciteTests.createMockSqlEngine(walker, conglomerate),
plannerFactory,
new NoopServiceEmitter(),
testRequestLogger,
QueryStackTests.DEFAULT_NOOP_SCHEDULER,
new AuthConfig(),
defaultQueryConfig,
new SqlLifecycleManager()
)
);
}
@After
@ -221,7 +222,7 @@ public class SqlStatementTest
DirectStatement stmt = sqlStatementFactory.directStatement(sqlReq);
ResultSet resultSet = stmt.plan();
assertTrue(resultSet.runnable());
List<Object[]> results = resultSet.run().toList();
List<Object[]> results = resultSet.run().getResults().toList();
assertEquals(1, results.size());
assertEquals(6L, results.get(0)[0]);
assertEquals("foo", results.get(0)[1]);
@ -341,7 +342,7 @@ public class SqlStatementTest
makeQuery("SELECT COUNT(*) AS cnt, 'foo' AS TheFoo FROM druid.foo"),
request(true)
);
List<Object[]> results = stmt.execute().toList();
List<Object[]> results = stmt.execute().getResults().toList();
assertEquals(1, results.size());
assertEquals(6L, results.get(0)[0]);
assertEquals("foo", results.get(0)[1]);
@ -422,6 +423,7 @@ public class SqlStatementTest
List<Object[]> results = stmt
.execute(Collections.emptyList())
.execute()
.getResults()
.toList();
assertEquals(1, results.size());
assertEquals(6L, results.get(0)[0]);

View File

@ -72,8 +72,6 @@ import org.apache.druid.sql.calcite.planner.Calcites;
import org.apache.druid.sql.calcite.planner.DruidOperatorTable;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
import org.apache.druid.sql.calcite.planner.PlannerFactory;
import org.apache.druid.sql.calcite.run.NativeSqlEngine;
import org.apache.druid.sql.calcite.run.SqlEngine;
import org.apache.druid.sql.calcite.schema.DruidSchemaCatalog;
import org.apache.druid.sql.calcite.schema.DruidSchemaName;
import org.apache.druid.sql.calcite.schema.NamedSchema;
@ -81,6 +79,7 @@ import org.apache.druid.sql.calcite.util.CalciteTestBase;
import org.apache.druid.sql.calcite.util.CalciteTests;
import org.apache.druid.sql.calcite.util.QueryLogHook;
import org.apache.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker;
import org.apache.druid.sql.guice.SqlModule;
import org.eclipse.jetty.server.Server;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
@ -221,7 +220,7 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase
binder.bind(QueryScheduler.class)
.toProvider(QuerySchedulerProvider.class)
.in(LazySingleton.class);
binder.bind(SqlEngine.class).to(NativeSqlEngine.class);
binder.install(new SqlModule.SqlStatementFactoryModule());
binder.bind(new TypeLiteral<Supplier<DefaultQueryConfig>>(){}).toInstance(Suppliers.ofInstance(new DefaultQueryConfig(ImmutableMap.of())));
binder.bind(CalciteRulesManager.class).toInstance(new CalciteRulesManager(ImmutableSet.of()));
}

View File

@ -917,7 +917,7 @@ public class BaseCalciteQueryTest extends CalciteTestBase
.auth(authenticationResult)
.build()
);
Sequence<Object[]> results = stmt.execute();
Sequence<Object[]> results = stmt.execute().getResults();
RelDataType rowType = stmt.prepareResult().getReturnedRowType();
return new Pair<>(
RowSignatures.fromRelDataType(rowType.getFieldNames(), rowType),

View File

@ -4559,7 +4559,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
QueryLifecycleFactory qlf = CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate);
QueryLifecycle ql = qlf.factorize();
Sequence seq = ql.runSimple(query, CalciteTests.SUPER_USER_AUTH_RESULT, Access.OK);
Sequence seq = ql.runSimple(query, CalciteTests.SUPER_USER_AUTH_RESULT, Access.OK).getResults();
List<Object> results = seq.toList();
Assert.assertEquals(
ImmutableList.of(ResultRow.of("def")),

View File

@ -200,8 +200,8 @@ public class SqlVectorizedExpressionSanityTest extends InitializedNullHandlingTe
) {
final PlannerResult vectorPlan = vectorPlanner.plan();
final PlannerResult nonVectorPlan = nonVectorPlanner.plan();
final Sequence<Object[]> vectorSequence = vectorPlan.run();
final Sequence<Object[]> nonVectorSequence = nonVectorPlan.run();
final Sequence<Object[]> vectorSequence = vectorPlan.run().getResults();
final Sequence<Object[]> nonVectorSequence = nonVectorPlan.run().getResults();
Yielder<Object[]> vectorizedYielder = Yielders.each(vectorSequence);
Yielder<Object[]> nonVectorizedYielder = Yielders.each(nonVectorSequence);
int row = 0;

View File

@ -21,9 +21,9 @@ package org.apache.druid.sql.calcite;
import com.google.common.collect.ImmutableList;
import org.apache.calcite.runtime.Hook;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.server.QueryResponse;
import org.apache.druid.sql.calcite.rel.DruidQuery;
import org.apache.druid.sql.calcite.run.QueryMaker;
@ -45,7 +45,7 @@ public class TestInsertQueryMaker implements QueryMaker
}
@Override
public Sequence<Object[]> runQuery(final DruidQuery druidQuery)
public QueryResponse runQuery(final DruidQuery druidQuery)
{
// Don't actually execute anything, but do record information that tests will check for.
@ -53,6 +53,8 @@ public class TestInsertQueryMaker implements QueryMaker
Hook.QUERY_PLAN.run(druidQuery.getQuery());
// 2) Return the dataSource and signature of the insert operation, so tests can confirm they are correct.
return Sequences.simple(ImmutableList.of(new Object[]{targetDataSource, signature}));
return QueryResponse.withEmptyContext(
Sequences.simple(ImmutableList.of(new Object[]{targetDataSource, signature}))
);
}
}

View File

@ -32,6 +32,7 @@ import org.apache.druid.client.BrokerInternalQueryConfig;
import org.apache.druid.client.ImmutableDruidServer;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.GlobalTableDataSource;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
@ -53,6 +54,7 @@ import org.apache.druid.segment.join.MapJoinableFactory;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.server.QueryLifecycle;
import org.apache.druid.server.QueryLifecycleFactory;
import org.apache.druid.server.QueryResponse;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.security.Access;
@ -1089,7 +1091,7 @@ public class SegmentMetadataCacheTest extends SegmentMetadataCacheCommon
EasyMock.expect(factoryMock.factorize()).andReturn(lifecycleMock).once();
// This is the mat of the test, making sure that the query created by the method under test matches the expected query, specifically the operator configured context
EasyMock.expect(lifecycleMock.runSimple(expectedMetadataQuery, AllowAllAuthenticator.ALLOW_ALL_RESULT, Access.OK))
.andReturn(null);
.andReturn(QueryResponse.withEmptyContext(Sequences.empty()));
EasyMock.replay(factoryMock, lifecycleMock);

View File

@ -114,7 +114,7 @@ import org.apache.druid.server.security.NoopEscalator;
import org.apache.druid.server.security.ResourceType;
import org.apache.druid.sql.SqlLifecycleManager;
import org.apache.druid.sql.SqlStatementFactory;
import org.apache.druid.sql.SqlStatementFactoryFactory;
import org.apache.druid.sql.SqlToolbox;
import org.apache.druid.sql.calcite.planner.DruidOperatorTable;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
import org.apache.druid.sql.calcite.planner.PlannerFactory;
@ -804,15 +804,18 @@ public class CalciteTests
final AuthConfig authConfig
)
{
return new SqlStatementFactoryFactory(
plannerFactory,
new ServiceEmitter("dummy", "dummy", new NoopEmitter()),
new NoopRequestLogger(),
QueryStackTests.DEFAULT_NOOP_SCHEDULER,
authConfig,
Suppliers.ofInstance(new DefaultQueryConfig(ImmutableMap.of())),
new SqlLifecycleManager()
).factorize(engine);
return new SqlStatementFactory(
new SqlToolbox(
engine,
plannerFactory,
new ServiceEmitter("dummy", "dummy", new NoopEmitter()),
new NoopRequestLogger(),
QueryStackTests.DEFAULT_NOOP_SCHEDULER,
authConfig,
new DefaultQueryConfig(ImmutableMap.of()),
new SqlLifecycleManager()
)
);
}
public static ObjectMapper getJsonMapper()

View File

@ -53,6 +53,8 @@ import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.loading.SegmentLoader;
import org.apache.druid.server.QueryScheduler;
import org.apache.druid.server.QuerySchedulerProvider;
import org.apache.druid.server.ResponseContextConfig;
import org.apache.druid.server.initialization.AuthenticatorMapperModule;
import org.apache.druid.server.log.NoopRequestLogger;
import org.apache.druid.server.log.RequestLogger;
import org.apache.druid.server.security.AuthorizerMapper;
@ -63,6 +65,7 @@ import org.apache.druid.sql.calcite.util.CalciteTests;
import org.apache.druid.sql.calcite.view.DruidViewMacro;
import org.apache.druid.sql.calcite.view.NoopViewManager;
import org.apache.druid.sql.calcite.view.ViewManager;
import org.apache.druid.sql.http.SqlResourceTest;
import org.easymock.EasyMock;
import org.easymock.EasyMockRunner;
import org.easymock.Mock;
@ -165,12 +168,16 @@ public class SqlModuleTest
private Injector makeInjectorWithProperties(final Properties props)
{
final SqlModule sqlModule = new SqlModule();
sqlModule.setProps(props);
return Guice.createInjector(
ImmutableList.of(
new DruidGuiceExtensions(),
new LifecycleModule(),
new ServerModule(),
new JacksonModule(),
new AuthenticatorMapperModule(),
(Module) binder -> {
binder.bind(Validator.class).toInstance(Validation.buildDefaultValidatorFactory().getValidator());
binder.bind(JsonConfigurator.class).in(LazySingleton.class);
@ -196,8 +203,9 @@ public class SqlModuleTest
binder.bind(QueryScheduler.class)
.toProvider(QuerySchedulerProvider.class)
.in(LazySingleton.class);
binder.bind(ResponseContextConfig.class).toInstance(SqlResourceTest.TEST_RESPONSE_CONTEXT_CONFIG);
},
new SqlModule(props),
sqlModule,
new TestViewManagerModule()
)
);

View File

@ -29,11 +29,13 @@ import org.apache.druid.guice.DruidGuiceExtensions;
import org.apache.druid.guice.LifecycleModule;
import org.apache.druid.guice.annotations.JSR311Resource;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.guice.annotations.NativeQuery;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.ResponseContextConfig;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.sql.SqlStatementFactory;
import org.apache.druid.sql.SqlStatementFactoryFactory;
import org.apache.druid.sql.calcite.run.NativeSqlEngine;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.easymock.EasyMockRunner;
import org.easymock.Mock;
@ -50,8 +52,6 @@ public class SqlHttpModuleTest
{
@Mock
private ObjectMapper jsonMpper;
@Mock
private SqlStatementFactoryFactory sqlStatementFactoryFactory;
private SqlHttpModule target;
private Injector injector;
@ -59,39 +59,40 @@ public class SqlHttpModuleTest
@Before
public void setUp()
{
EasyMock.expect(sqlStatementFactoryFactory.factorize(EasyMock.capture(Capture.newInstance())))
.andReturn(EasyMock.mock(SqlStatementFactory.class))
.anyTimes();
EasyMock.replay(sqlStatementFactoryFactory);
target = new SqlHttpModule();
injector = Guice.createInjector(
new LifecycleModule(),
new DruidGuiceExtensions(),
binder -> {
binder.bind(ObjectMapper.class).annotatedWith(Json.class).toInstance(jsonMpper);
binder.bind(SqlStatementFactoryFactory.class).toInstance(sqlStatementFactoryFactory);
binder.bind(AuthorizerMapper.class).toInstance(new AuthorizerMapper(Collections.emptyMap()));
binder.bind(NativeSqlEngine.class).toProvider(Providers.of(new NativeSqlEngine(null, null)));
binder.bind(DruidNode.class).annotatedWith(Self.class).toInstance(SqlResourceTest.DUMMY_DRUID_NODE);
binder.bind(ResponseContextConfig.class).toInstance(SqlResourceTest.TEST_RESPONSE_CONTEXT_CONFIG);
binder.bind(SqlStatementFactory.class)
.annotatedWith(NativeQuery.class)
.toInstance(EasyMock.mock(SqlStatementFactory.class));
},
target
);
}
@Test
public void testSqlResourceIsInjectedAndNotSingleton()
public void testSqlResourceIsInjectedAndSingleton()
{
SqlResource sqlResource = injector.getInstance(SqlResource.class);
Assert.assertNotNull(sqlResource);
SqlResource other = injector.getInstance(SqlResource.class);
Assert.assertNotSame(other, sqlResource);
Assert.assertSame(other, sqlResource);
}
@Test
public void testSqlResourceIsAvailableViaJersey()
{
Set<Class<?>> jerseyResourceClasses =
injector.getInstance(Key.get(new TypeLiteral<Set<Class<?>>>() {}, JSR311Resource.class));
injector.getInstance(Key.get(new TypeLiteral<Set<Class<?>>>()
{
}, JSR311Resource.class));
Assert.assertEquals(1, jerseyResourceClasses.size());
Assert.assertEquals(SqlResource.class, jerseyResourceClasses.iterator().next());
}

View File

@ -30,6 +30,7 @@ import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.calcite.avatica.SqlType;
import org.apache.commons.io.output.NullOutputStream;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.common.exception.AllowedRegexErrorResponseTransformStrategy;
import org.apache.druid.common.exception.ErrorResponseTransformStrategy;
@ -57,9 +58,13 @@ import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.QueryTimeoutException;
import org.apache.druid.query.QueryUnsupportedException;
import org.apache.druid.query.ResourceLimitExceededException;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.QueryResponse;
import org.apache.druid.server.QueryScheduler;
import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.ResponseContextConfig;
import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.server.log.TestRequestLogger;
import org.apache.druid.server.scheduling.HiLoQueryLaningStrategy;
@ -96,10 +101,8 @@ import org.easymock.EasyMock;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@ -110,7 +113,6 @@ import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.StreamingOutput;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
@ -130,11 +132,15 @@ import java.util.stream.Collectors;
public class SqlResourceTest extends CalciteTestBase
{
public static final DruidNode DUMMY_DRUID_NODE = new DruidNode("dummy", "dummy", false, 1, null, true, false);
public static final ResponseContextConfig TEST_RESPONSE_CONTEXT_CONFIG = ResponseContextConfig.newConfig(false);
private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper();
private static final String DUMMY_SQL_QUERY_ID = "dummy";
// Timeout to allow (rapid) debugging, while not blocking tests with errors.
private static final int WAIT_TIMEOUT_SECS = 60;
private static final Consumer<DirectStatement> NULL_ACTION = s -> {};
private static final Consumer<DirectStatement> NULL_ACTION = s -> {
};
private static final List<String> EXPECTED_COLUMNS_FOR_RESULT_FORMAT_TESTS =
Arrays.asList("__time", "dim1", "dim2", "dim3", "cnt", "m1", "m2", "unique_dim1", "EXPR$8");
@ -166,26 +172,17 @@ public class SqlResourceTest extends CalciteTestBase
private final SettableSupplier<NonnullPair<CountDownLatch, Boolean>> planLatchSupplier = new SettableSupplier<>();
private final SettableSupplier<NonnullPair<CountDownLatch, Boolean>> executeLatchSupplier = new SettableSupplier<>();
private final SettableSupplier<Function<Sequence<Object[]>, Sequence<Object[]>>> sequenceMapFnSupplier = new SettableSupplier<>();
private final SettableSupplier<ResponseContext> responseContextSupplier = new SettableSupplier<>();
private Consumer<DirectStatement> onExecute = NULL_ACTION;
private boolean sleep;
@BeforeClass
public static void setUpClass()
{
resourceCloser = Closer.create();
conglomerate = QueryStackTests.createQueryRunnerFactoryConglomerate(resourceCloser);
}
@AfterClass
public static void tearDownClass() throws IOException
{
resourceCloser.close();
}
@Before
public void setUp() throws Exception
{
resourceCloser = Closer.create();
conglomerate = QueryStackTests.createQueryRunnerFactoryConglomerate(resourceCloser);
final QueryScheduler scheduler = new QueryScheduler(
5,
ManualQueryPrioritizationStrategy.INSTANCE,
@ -265,7 +262,7 @@ public class SqlResourceTest extends CalciteTestBase
defaultQueryConfig,
lifecycleManager
);
sqlStatementFactory = new SqlStatementFactory()
sqlStatementFactory = new SqlStatementFactory(null)
{
@Override
public HttpStatement httpStatement(
@ -281,6 +278,7 @@ public class SqlResourceTest extends CalciteTestBase
planLatchSupplier,
executeLatchSupplier,
sequenceMapFnSupplier,
responseContextSupplier,
onExecute
);
onExecute = NULL_ACTION;
@ -304,7 +302,9 @@ public class SqlResourceTest extends CalciteTestBase
CalciteTests.TEST_AUTHORIZER_MAPPER,
sqlStatementFactory,
lifecycleManager,
new ServerConfig()
new ServerConfig(),
TEST_RESPONSE_CONTEXT_CONFIG,
DUMMY_DRUID_NODE
);
}
@ -320,6 +320,7 @@ public class SqlResourceTest extends CalciteTestBase
walker = null;
executorService.shutdownNow();
executorService.awaitTermination(2, TimeUnit.SECONDS);
resourceCloser.close();
}
@Test
@ -358,6 +359,55 @@ public class SqlResourceTest extends CalciteTestBase
Assert.assertTrue(lifecycleManager.getAll("id").isEmpty());
}
@Test
public void testCountStarWithMissingIntervalsContext() throws Exception
{
final SqlQuery sqlQuery = new SqlQuery(
"SELECT COUNT(*) AS cnt, 'foo' AS TheFoo FROM druid.foo",
null,
false,
false,
false,
// We set uncoveredIntervalsLimit more for the funzies than anything. The underlying setup of the test doesn't
// actually look at it or operate with it. Instead, we set the supplier of the ResponseContext to mock what
// we would expect from the normal query pipeline
ImmutableMap.of(BaseQuery.SQL_QUERY_ID, "id", "uncoveredIntervalsLimit", 1),
null
);
final ResponseContext mockRespContext = ResponseContext.createEmpty();
mockRespContext.put(ResponseContext.Keys.instance().keyOf("uncoveredIntervals"), "2030-01-01/78149827981274-01-01");
mockRespContext.put(ResponseContext.Keys.instance().keyOf("uncoveredIntervalsOverflowed"), "true");
responseContextSupplier.set(mockRespContext);
final Response response = resource.doPost(sqlQuery, makeRegularUserReq());
Map responseContext = JSON_MAPPER.readValue(
(String) response.getMetadata().getFirst("X-Druid-Response-Context"),
Map.class
);
Assert.assertEquals(
ImmutableMap.of(
"uncoveredIntervals", "2030-01-01/78149827981274-01-01",
"uncoveredIntervalsOverflowed", "true"
),
responseContext
);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
((StreamingOutput) response.getEntity()).write(baos);
Object results = JSON_MAPPER.readValue(baos.toByteArray(), Object.class);
Assert.assertEquals(
ImmutableList.of(
ImmutableMap.of("cnt", 6, "TheFoo", "foo")
),
results
);
checkSqlRequestLog(true);
Assert.assertTrue(lifecycleManager.getAll("id").isEmpty());
}
@Test
public void testSqlLifecycleMetrics() throws Exception
{
@ -594,7 +644,9 @@ public class SqlResourceTest extends CalciteTestBase
),
doPost(
new SqlQuery(query, ResultFormat.ARRAY, false, false, false, null, null),
new TypeReference<List<List<Object>>>() {}
new TypeReference<List<List<Object>>>()
{
}
).rhs
);
}
@ -705,7 +757,9 @@ public class SqlResourceTest extends CalciteTestBase
),
doPost(
new SqlQuery(query, ResultFormat.ARRAY, true, true, true, null, null),
new TypeReference<List<List<Object>>>() {}
new TypeReference<List<List<Object>>>()
{
}
).rhs
);
}
@ -729,7 +783,9 @@ public class SqlResourceTest extends CalciteTestBase
),
doPost(
new SqlQuery(query, ResultFormat.ARRAY, true, true, true, null, null),
new TypeReference<List<List<Object>>>() {}
new TypeReference<List<List<Object>>>()
{
}
).rhs
);
}
@ -896,7 +952,9 @@ public class SqlResourceTest extends CalciteTestBase
).stream().map(transformer).collect(Collectors.toList()),
doPost(
new SqlQuery(query, ResultFormat.OBJECT, false, false, false, null, null),
new TypeReference<List<Map<String, Object>>>() {}
new TypeReference<List<Map<String, Object>>>()
{
}
).rhs
);
}
@ -1102,10 +1160,10 @@ public class SqlResourceTest extends CalciteTestBase
Assert.assertEquals(4, lines.size());
Assert.assertEquals(expectedHeader, JSON_MAPPER.readValue(lines.get(0), Object.class));
Assert.assertEquals(
ImmutableMap
.<String, Object>builder()
.put("EXPR$0", Arrays.asList(1, 2))
.build(),
ImmutableMap
.<String, Object>builder()
.put("EXPR$0", Arrays.asList(1, 2))
.build(),
JSON_MAPPER.readValue(lines.get(1), Object.class)
);
@ -1338,8 +1396,8 @@ public class SqlResourceTest extends CalciteTestBase
false,
ImmutableMap.of(BaseQuery.SQL_QUERY_ID, "id"),
null
)
).lhs;
)
).lhs;
Assert.assertNotNull(exception);
Assert.assertEquals(QueryUnsupportedException.ERROR_CODE, exception.getErrorCode());
@ -1362,8 +1420,9 @@ public class SqlResourceTest extends CalciteTestBase
false,
ImmutableMap.of("sqlQueryId", queryId),
null
),
req);
),
req
);
Assert.assertNotEquals(200, response.getStatus());
final MultivaluedMap<String, Object> headers = response.getMetadata();
Assert.assertTrue(headers.containsKey(SqlResource.SQL_QUERY_ID_RESPONSE_HEADER));
@ -1385,8 +1444,9 @@ public class SqlResourceTest extends CalciteTestBase
false,
ImmutableMap.of(),
null
),
req);
),
req
);
Assert.assertNotEquals(200, response.getStatus());
final MultivaluedMap<String, Object> headers = response.getMetadata();
Assert.assertTrue(headers.containsKey(SqlResource.SQL_QUERY_ID_RESPONSE_HEADER));
@ -1402,7 +1462,8 @@ public class SqlResourceTest extends CalciteTestBase
CalciteTests.TEST_AUTHORIZER_MAPPER,
sqlStatementFactory,
lifecycleManager,
new ServerConfig() {
new ServerConfig()
{
@Override
public boolean isShowDetailedJettyErrors()
{
@ -1414,7 +1475,9 @@ public class SqlResourceTest extends CalciteTestBase
{
return new AllowedRegexErrorResponseTransformStrategy(ImmutableList.of());
}
}
},
TEST_RESPONSE_CONTEXT_CONFIG,
DUMMY_DRUID_NODE
);
String errorMessage = "This will be supported in Druid 9999";
@ -1428,8 +1491,8 @@ public class SqlResourceTest extends CalciteTestBase
false,
ImmutableMap.of("sqlQueryId", "id"),
null
)
).lhs;
)
).lhs;
Assert.assertNotNull(exception);
Assert.assertNull(exception.getMessage());
@ -1460,7 +1523,9 @@ public class SqlResourceTest extends CalciteTestBase
{
return new AllowedRegexErrorResponseTransformStrategy(ImmutableList.of());
}
}
},
TEST_RESPONSE_CONTEXT_CONFIG,
DUMMY_DRUID_NODE
);
String errorMessage = "could not assert";
@ -1477,8 +1542,8 @@ public class SqlResourceTest extends CalciteTestBase
false,
ImmutableMap.of("sqlQueryId", "id"),
null
)
).lhs;
)
).lhs;
Assert.assertNotNull(exception);
Assert.assertNull(exception.getMessage());
@ -1653,6 +1718,24 @@ public class SqlResourceTest extends CalciteTestBase
execLatch.countDown();
response = future.get();
// The response that we get is the actual object created by the SqlResource. The StreamingOutput object that
// the SqlResource returns at the time of writing has resources opened up (the query is already running) which
// need to be closed. As such, the StreamingOutput needs to actually be called in order to cause that close
// to occur, so we must get the entity out and call `.write(OutputStream)` on it to invoke the code.
try {
((StreamingOutput) response.getEntity()).write(NullOutputStream.NULL_OUTPUT_STREAM);
}
catch (IllegalStateException e) {
// When we actually attempt to write to the output stream, we seem to run into multi-threading issues likely
// with our test setup. Instead of figuring out how to make the thing work, given that we don't actually
// care about the response, we are going to just ensure that it was the expected exception and ignore it.
// It's possible that this test starts failing suddenly if someone changes the message of the exception, it
// should be safe to just update the expected message here too if that happens.
Assert.assertEquals(
"DefaultQueryMetrics must not be modified from multiple threads. If it is needed to gather dimension or metric information from multiple threads or from an async thread, this information should explicitly be passed between threads (e. g. using Futures), or this DefaultQueryMetrics's ownerThread should be reassigned explicitly",
e.getMessage()
);
}
Assert.assertEquals(Status.OK.getStatusCode(), response.getStatus());
}
@ -1906,6 +1989,7 @@ public class SqlResourceTest extends CalciteTestBase
private final SettableSupplier<NonnullPair<CountDownLatch, Boolean>> planLatchSupplier;
private final SettableSupplier<NonnullPair<CountDownLatch, Boolean>> executeLatchSupplier;
private final SettableSupplier<Function<Sequence<Object[]>, Sequence<Object[]>>> sequenceMapFnSupplier;
private final SettableSupplier<ResponseContext> responseContextSupplier;
private final Consumer<DirectStatement> onExecute;
private TestHttpStatement(
@ -1916,6 +2000,7 @@ public class SqlResourceTest extends CalciteTestBase
SettableSupplier<NonnullPair<CountDownLatch, Boolean>> planLatchSupplier,
SettableSupplier<NonnullPair<CountDownLatch, Boolean>> executeLatchSupplier,
SettableSupplier<Function<Sequence<Object[]>, Sequence<Object[]>>> sequenceMapFnSupplier,
SettableSupplier<ResponseContext> responseContextSupplier,
final Consumer<DirectStatement> onAuthorize
)
{
@ -1924,13 +2009,15 @@ public class SqlResourceTest extends CalciteTestBase
this.planLatchSupplier = planLatchSupplier;
this.executeLatchSupplier = executeLatchSupplier;
this.sequenceMapFnSupplier = sequenceMapFnSupplier;
this.responseContextSupplier = responseContextSupplier;
this.onExecute = onAuthorize;
}
@Override
protected void authorize(
DruidPlanner planner,
Function<Set<ResourceAction>, Access> authorizer)
Function<Set<ResourceAction>, Access> authorizer
)
{
if (validateAndAuthorizeLatchSupplier.get() != null) {
if (validateAndAuthorizeLatchSupplier.get().rhs) {
@ -1955,14 +2042,15 @@ public class SqlResourceTest extends CalciteTestBase
@Override
public PlannerResult createPlan(DruidPlanner planner)
{
if (planLatchSupplier.get() != null) {
if (planLatchSupplier.get().rhs) {
final NonnullPair<CountDownLatch, Boolean> planLatch = planLatchSupplier.get();
if (planLatch != null) {
if (planLatch.rhs) {
PlannerResult result = super.createPlan(planner);
planLatchSupplier.get().lhs.countDown();
planLatch.lhs.countDown();
return result;
} else {
try {
if (!planLatchSupplier.get().lhs.await(WAIT_TIMEOUT_SECS, TimeUnit.SECONDS)) {
if (!planLatch.lhs.await(WAIT_TIMEOUT_SECS, TimeUnit.SECONDS)) {
throw new RuntimeException("Latch timed out");
}
}
@ -1989,30 +2077,37 @@ public class SqlResourceTest extends CalciteTestBase
return new ResultSet(plannerResult)
{
@Override
public Sequence<Object[]> run()
public QueryResponse run()
{
final Function<Sequence<Object[]>, Sequence<Object[]>> sequenceMapFn =
Optional.ofNullable(sequenceMapFnSupplier.get()).orElse(Function.identity());
if (executeLatchSupplier.get() != null) {
if (executeLatchSupplier.get().rhs) {
Sequence<Object[]> sequence = sequenceMapFn.apply(super.run());
executeLatchSupplier.get().lhs.countDown();
return sequence;
final NonnullPair<CountDownLatch, Boolean> executeLatch = executeLatchSupplier.get();
if (executeLatch != null) {
if (executeLatch.rhs) {
final QueryResponse resp = super.run();
Sequence<Object[]> sequence = sequenceMapFn.apply(resp.getResults());
executeLatch.lhs.countDown();
final ResponseContext respContext = resp.getResponseContext();
respContext.merge(responseContextSupplier.get());
return new QueryResponse(sequence, respContext);
} else {
try {
if (!executeLatchSupplier.get().lhs.await(WAIT_TIMEOUT_SECS, TimeUnit.SECONDS)) {
if (!executeLatch.lhs.await(WAIT_TIMEOUT_SECS, TimeUnit.SECONDS)) {
throw new RuntimeException("Latch timed out");
}
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
return sequenceMapFn.apply(super.run());
}
} else {
return sequenceMapFn.apply(super.run());
}
final QueryResponse resp = super.run();
Sequence<Object[]> sequence = sequenceMapFn.apply(resp.getResults());
final ResponseContext respContext = resp.getResponseContext();
respContext.merge(responseContextSupplier.get());
return new QueryResponse(sequence, respContext);
}
};
}