From 5ba0075c0c5054b7f11b02201e80e952dcf0e691 Mon Sep 17 00:00:00 2001 From: imply-cheddar <86940447+imply-cheddar@users.noreply.github.com> Date: Mon, 12 Sep 2022 17:40:06 +0900 Subject: [PATCH] Expose HTTP Response headers from SqlResource (#13052) * Expose HTTP Response headers from SqlResource This change makes the SqlResource expose HTTP response headers in the same way that the QueryResource exposes them. Fundamentally, the change is to pipe the QueryResponse object all the way through to the Resource so that it can populate response headers. There is also some code cleanup around DI, as there was a superfluous FactoryFactory class muddying things up. --- .../druid/benchmark/query/SqlBenchmark.java | 2 +- .../query/SqlExpressionBenchmark.java | 2 +- .../query/SqlNestedDataBenchmark.java | 2 +- .../benchmark/query/SqlVsNativeBenchmark.java | 2 +- .../druid/guice/annotations/NativeQuery.java | 40 ++++ .../apache/druid/guice/annotations/MSQ.java | 40 ++++ .../apache/druid/msq/guice/MSQSqlModule.java | 20 ++ .../druid/msq/sql/MSQTaskQueryMaker.java | 6 +- .../druid/msq/sql/MSQTaskSqlEngine.java | 2 - .../apache/druid/msq/sql/SqlTaskResource.java | 11 +- .../apache/druid/msq/test/MSQTestBase.java | 17 +- .../druid/query/context/ResponseContext.java | 5 +- .../apache/druid/query/TestBufferPool.java | 36 +-- .../query/groupby/TestGroupByBuffers.java | 4 +- .../apache/druid/server/QueryLifecycle.java | 55 ++--- .../apache/druid/server/QueryResource.java | 80 ++++--- .../apache/druid/server/QueryResponse.java | 50 +++++ .../AuthenticatorMapperModule.java | 15 +- .../server/security/AuthenticatorMapper.java | 2 - .../server/security/AuthorizerMapper.java | 3 - sql/pom.xml | 5 + .../org/apache/druid/sql/DirectStatement.java | 6 +- .../apache/druid/sql/SqlStatementFactory.java | 38 +++- .../druid/sql/SqlStatementFactoryFactory.java | 105 --------- .../druid/sql/avatica/DruidJdbcResultSet.java | 2 +- .../apache/druid/sql/avatica/DruidMeta.java | 13 +- .../sql/calcite/planner/DruidPlanner.java | 55 ++--- .../sql/calcite/planner/PlannerResult.java | 8 +- .../druid/sql/calcite/rel/DruidRel.java | 4 +- .../druid/sql/calcite/rel/DruidUnionRel.java | 55 ++++- .../sql/calcite/run/NativeQueryMaker.java | 45 ++-- .../druid/sql/calcite/run/QueryMaker.java | 4 +- .../calcite/schema/SegmentMetadataCache.java | 3 +- .../org/apache/druid/sql/guice/SqlModule.java | 82 ++++++- .../apache/druid/sql/http/SqlHttpModule.java | 2 + .../apache/druid/sql/http/SqlResource.java | 91 ++++---- .../apache/druid/sql/SqlStatementTest.java | 28 +-- .../sql/avatica/DruidAvaticaHandlerTest.java | 5 +- .../sql/calcite/BaseCalciteQueryTest.java | 2 +- .../sql/calcite/CalciteJoinQueryTest.java | 2 +- .../SqlVectorizedExpressionSanityTest.java | 4 +- .../sql/calcite/TestInsertQueryMaker.java | 8 +- .../schema/SegmentMetadataCacheTest.java | 4 +- .../druid/sql/calcite/util/CalciteTests.java | 23 +- .../apache/druid/sql/guice/SqlModuleTest.java | 10 +- .../druid/sql/http/SqlHttpModuleTest.java | 27 +-- .../druid/sql/http/SqlResourceTest.java | 205 +++++++++++++----- 47 files changed, 771 insertions(+), 459 deletions(-) create mode 100644 core/src/main/java/org/apache/druid/guice/annotations/NativeQuery.java create mode 100644 extensions-core/multi-stage-query/src/main/java/org/apache/druid/guice/annotations/MSQ.java create mode 100644 server/src/main/java/org/apache/druid/server/QueryResponse.java delete mode 100644 sql/src/main/java/org/apache/druid/sql/SqlStatementFactoryFactory.java diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlBenchmark.java index 8b65ba0ebe2..35974e4430d 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlBenchmark.java @@ -518,7 +518,7 @@ public class SqlBenchmark final String sql = QUERIES.get(Integer.parseInt(query)); try (final DruidPlanner planner = plannerFactory.createPlannerForTesting(engine, sql, new QueryContext(context))) { final PlannerResult plannerResult = planner.plan(); - final Sequence resultSequence = plannerResult.run(); + final Sequence resultSequence = plannerResult.run().getResults(); final Object[] lastRow = resultSequence.accumulate(null, (accumulated, in) -> in); blackhole.consume(lastRow); } diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlExpressionBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlExpressionBenchmark.java index f7e3a3a7a4a..e1c27afc988 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlExpressionBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlExpressionBenchmark.java @@ -354,7 +354,7 @@ public class SqlExpressionBenchmark final String sql = QUERIES.get(Integer.parseInt(query)); try (final DruidPlanner planner = plannerFactory.createPlannerForTesting(engine, sql, new QueryContext(context))) { final PlannerResult plannerResult = planner.plan(); - final Sequence resultSequence = plannerResult.run(); + final Sequence resultSequence = plannerResult.run().getResults(); final Object[] lastRow = resultSequence.accumulate(null, (accumulated, in) -> in); blackhole.consume(lastRow); } diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlNestedDataBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlNestedDataBenchmark.java index a4876421c60..ab3f5de9cef 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlNestedDataBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlNestedDataBenchmark.java @@ -320,7 +320,7 @@ public class SqlNestedDataBenchmark final String sql = QUERIES.get(Integer.parseInt(query)); try (final DruidPlanner planner = plannerFactory.createPlannerForTesting(engine, sql, new QueryContext(context))) { final PlannerResult plannerResult = planner.plan(); - final Sequence resultSequence = plannerResult.run(); + final Sequence resultSequence = plannerResult.run().getResults(); final Object[] lastRow = resultSequence.accumulate(null, (accumulated, in) -> in); blackhole.consume(lastRow); } diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlVsNativeBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlVsNativeBenchmark.java index d7d2d7ad2cf..b11188eb98c 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlVsNativeBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlVsNativeBenchmark.java @@ -169,7 +169,7 @@ public class SqlVsNativeBenchmark { try (final DruidPlanner planner = plannerFactory.createPlannerForTesting(engine, sqlQuery, new QueryContext())) { final PlannerResult plannerResult = planner.plan(); - final Sequence resultSequence = plannerResult.run(); + final Sequence resultSequence = plannerResult.run().getResults(); final Object[] lastRow = resultSequence.accumulate(null, (accumulated, in) -> in); blackhole.consume(lastRow); } diff --git a/core/src/main/java/org/apache/druid/guice/annotations/NativeQuery.java b/core/src/main/java/org/apache/druid/guice/annotations/NativeQuery.java new file mode 100644 index 00000000000..60b0b191a3e --- /dev/null +++ b/core/src/main/java/org/apache/druid/guice/annotations/NativeQuery.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.guice.annotations; + +import com.google.inject.BindingAnnotation; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Binding annotation for implements of interfaces that are focused on running native queries. This is generally + * contrasted with the MSQ annotation. + * + * @see Parent + */ +@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD}) +@Retention(RetentionPolicy.RUNTIME) +@BindingAnnotation +public @interface NativeQuery +{ +} diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/guice/annotations/MSQ.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/guice/annotations/MSQ.java new file mode 100644 index 00000000000..c480168de25 --- /dev/null +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/guice/annotations/MSQ.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.guice.annotations; + +import com.google.inject.BindingAnnotation; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Binding annotation for implements of interfaces that are MSQ (MultiStageQuery) focused. This is generally + * contrasted with the NativeQ annotation. + * + * @see Parent + */ +@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD}) +@Retention(RetentionPolicy.RUNTIME) +@BindingAnnotation +public @interface MSQ +{ +} diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQSqlModule.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQSqlModule.java index 9aea33f4072..e2c47c2a2dd 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQSqlModule.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQSqlModule.java @@ -22,10 +22,16 @@ package org.apache.druid.msq.guice; import com.fasterxml.jackson.databind.Module; import com.google.inject.Binder; import com.google.inject.Inject; +import com.google.inject.Provides; import org.apache.druid.discovery.NodeRole; +import org.apache.druid.guice.LazySingleton; import org.apache.druid.guice.annotations.LoadScope; +import org.apache.druid.guice.annotations.MSQ; import org.apache.druid.initialization.DruidModule; import org.apache.druid.metadata.input.InputSourceModule; +import org.apache.druid.msq.sql.MSQTaskSqlEngine; +import org.apache.druid.sql.SqlStatementFactory; +import org.apache.druid.sql.SqlToolbox; import org.apache.druid.sql.calcite.external.ExternalOperatorConversion; import org.apache.druid.sql.guice.SqlBindings; @@ -54,7 +60,21 @@ public class MSQSqlModule implements DruidModule // We want this module to bring InputSourceModule along for the ride. binder.install(new InputSourceModule()); + binder.bind(MSQTaskSqlEngine.class).in(LazySingleton.class); + // Set up the EXTERN macro. SqlBindings.addOperatorConversion(binder, ExternalOperatorConversion.class); } + + + @Provides + @MSQ + @LazySingleton + public SqlStatementFactory makeMSQSqlStatementFactory( + final MSQTaskSqlEngine engine, + SqlToolbox toolbox + ) + { + return new SqlStatementFactory(toolbox.withEngine(engine)); + } } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java index 53bb5bf8461..e2754cacbeb 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java @@ -31,7 +31,6 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularity; -import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.msq.exec.MSQTasks; import org.apache.druid.msq.indexing.ColumnMapping; @@ -49,6 +48,7 @@ import org.apache.druid.rpc.indexing.OverlordClient; import org.apache.druid.segment.DimensionHandlerUtils; import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.server.QueryResponse; import org.apache.druid.sql.calcite.parser.DruidSqlInsert; import org.apache.druid.sql.calcite.parser.DruidSqlReplace; import org.apache.druid.sql.calcite.planner.PlannerContext; @@ -105,7 +105,7 @@ public class MSQTaskQueryMaker implements QueryMaker } @Override - public Sequence runQuery(final DruidQuery druidQuery) + public QueryResponse runQuery(final DruidQuery druidQuery) { String taskId = MSQTasks.controllerTaskId(plannerContext.getSqlQueryId()); @@ -259,7 +259,7 @@ public class MSQTaskQueryMaker implements QueryMaker ); FutureUtils.getUnchecked(overlordClient.runTask(taskId, controllerTask), true); - return Sequences.simple(Collections.singletonList(new Object[]{taskId})); + return QueryResponse.withEmptyContext(Sequences.simple(Collections.singletonList(new Object[]{taskId}))); } private static Map buildAggregationIntermediateTypeMap(final DruidQuery druidQuery) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java index 593eccac4fb..226c4ffcdb1 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java @@ -32,7 +32,6 @@ import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.sql.type.SqlTypeName; import org.apache.calcite.tools.ValidationException; import org.apache.calcite.util.Pair; -import org.apache.druid.guice.LazySingleton; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularities; @@ -55,7 +54,6 @@ import java.util.HashSet; import java.util.List; import java.util.Set; -@LazySingleton public class MSQTaskSqlEngine implements SqlEngine { public static final Set SYSTEM_CONTEXT_PARAMETERS = diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/SqlTaskResource.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/SqlTaskResource.java index e49fda973a9..17250608060 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/SqlTaskResource.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/SqlTaskResource.java @@ -25,6 +25,7 @@ import com.google.common.io.CountingOutputStream; import com.google.inject.Inject; import org.apache.calcite.plan.RelOptPlanner; import org.apache.druid.common.exception.SanitizableException; +import org.apache.druid.guice.annotations.MSQ; import org.apache.druid.indexer.TaskState; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Yielder; @@ -37,6 +38,7 @@ import org.apache.druid.query.QueryInterruptedException; import org.apache.druid.query.QueryTimeoutException; import org.apache.druid.query.QueryUnsupportedException; import org.apache.druid.query.ResourceLimitExceededException; +import org.apache.druid.server.QueryResponse; import org.apache.druid.server.initialization.ServerConfig; import org.apache.druid.server.security.Access; import org.apache.druid.server.security.AuthorizationUtils; @@ -47,7 +49,6 @@ import org.apache.druid.sql.HttpStatement; import org.apache.druid.sql.SqlPlanningException; import org.apache.druid.sql.SqlRowTransformer; import org.apache.druid.sql.SqlStatementFactory; -import org.apache.druid.sql.SqlStatementFactoryFactory; import org.apache.druid.sql.http.ResultFormat; import org.apache.druid.sql.http.SqlQuery; import org.apache.druid.sql.http.SqlResource; @@ -91,14 +92,13 @@ public class SqlTaskResource @Inject public SqlTaskResource( - final MSQTaskSqlEngine engine, - final SqlStatementFactoryFactory sqlStatementFactoryFactory, + final @MSQ SqlStatementFactory sqlStatementFactory, final ServerConfig serverConfig, final AuthorizerMapper authorizerMapper, final ObjectMapper jsonMapper ) { - this.sqlStatementFactory = sqlStatementFactoryFactory.factorize(engine); + this.sqlStatementFactory = sqlStatementFactory; this.serverConfig = serverConfig; this.authorizerMapper = authorizerMapper; this.jsonMapper = jsonMapper; @@ -147,7 +147,8 @@ public class SqlTaskResource final String sqlQueryId = stmt.sqlQueryId(); try { final DirectStatement.ResultSet plan = stmt.plan(); - final Sequence sequence = plan.run(); + final QueryResponse response = plan.run(); + final Sequence sequence = response.getResults(); final SqlRowTransformer rowTransformer = plan.createRowTransformer(); final boolean isTaskStruct = MSQTaskSqlEngine.TASK_STRUCT_FIELD_NAMES.equals(rowTransformer.getFieldList()); diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java index ae641fc7c79..880c6f4adea 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java @@ -35,6 +35,7 @@ import com.google.inject.Injector; import com.google.inject.Key; import com.google.inject.Module; import com.google.inject.TypeLiteral; +import com.google.inject.util.Modules; import com.google.inject.util.Providers; import org.apache.druid.common.config.NullHandling; import org.apache.druid.data.input.impl.DimensionsSpec; @@ -47,6 +48,7 @@ import org.apache.druid.guice.IndexingServiceTuningConfigModule; import org.apache.druid.guice.JoinableFactoryModule; import org.apache.druid.guice.JsonConfigProvider; import org.apache.druid.guice.annotations.EscalatedGlobal; +import org.apache.druid.guice.annotations.MSQ; import org.apache.druid.guice.annotations.Self; import org.apache.druid.hll.HyperLogLogCollector; import org.apache.druid.indexing.common.SegmentCacheManagerFactory; @@ -125,6 +127,7 @@ import org.apache.druid.server.security.AuthTestUtils; import org.apache.druid.sql.DirectStatement; import org.apache.druid.sql.SqlQueryPlus; import org.apache.druid.sql.SqlStatementFactory; +import org.apache.druid.sql.SqlToolbox; import org.apache.druid.sql.calcite.BaseCalciteQueryTest; import org.apache.druid.sql.calcite.external.ExternalDataSource; import org.apache.druid.sql.calcite.planner.CalciteRulesManager; @@ -341,7 +344,17 @@ public class MSQTestBase extends BaseCalciteQueryTest new JoinableFactoryModule(), new IndexingServiceTuningConfigModule(), new MSQIndexingModule(), - new MSQSqlModule(), + Modules.override(new MSQSqlModule()).with( + binder -> { + // Our Guice configuration currently requires bindings to exist even if they aren't ever used, the + // following bindings are overriding other bindings that end up needing a lot more dependencies. + // We replace the bindings with something that returns null to make things more brittle in case they + // actually are used somewhere in the test. + binder.bind(SqlStatementFactory.class).annotatedWith(MSQ.class).toProvider(Providers.of(null)); + binder.bind(SqlToolbox.class).toProvider(Providers.of(null)); + binder.bind(MSQTaskSqlEngine.class).toProvider(Providers.of(null)); + } + ), new MSQExternalDataSourceModule() )); @@ -580,7 +593,7 @@ public class MSQTestBase extends BaseCalciteQueryTest ) ); - final List sequence = stmt.execute().toList(); + final List sequence = stmt.execute().getResults().toList(); return (String) Iterables.getOnlyElement(sequence)[0]; } diff --git a/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java b/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java index 163b70383ef..a943297d173 100644 --- a/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java +++ b/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java @@ -39,7 +39,6 @@ import org.apache.druid.utils.CollectionUtils; import org.joda.time.Interval; import javax.annotation.Nullable; - import java.io.IOException; import java.util.ArrayList; import java.util.Comparator; @@ -733,6 +732,10 @@ public abstract class ResponseContext */ public void merge(ResponseContext responseContext) { + if (responseContext == null) { + return; + } + responseContext.getDelegate().forEach((key, newValue) -> { if (newValue != null) { add(key, newValue); diff --git a/processing/src/test/java/org/apache/druid/query/TestBufferPool.java b/processing/src/test/java/org/apache/druid/query/TestBufferPool.java index e841397c0e1..10690d31be1 100644 --- a/processing/src/test/java/org/apache/druid/query/TestBufferPool.java +++ b/processing/src/test/java/org/apache/druid/query/TestBufferPool.java @@ -20,7 +20,6 @@ package org.apache.druid.query; import com.google.common.collect.Iterables; -import com.google.errorprone.annotations.concurrent.GuardedBy; import org.apache.druid.collections.BlockingPool; import org.apache.druid.collections.NonBlockingPool; import org.apache.druid.collections.ReferenceCountingResourceHolder; @@ -31,26 +30,29 @@ import org.apache.druid.utils.CloseableUtils; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; /** * A buffer pool that throws away buffers when they are "returned" to the pool. Useful for tests that need to make * many pools and use them one at a time. - * + *

* This pool implements {@link BlockingPool}, but never blocks. It returns immediately if resources are available; * otherwise it returns an empty list immediately. This is also useful for tests, because it allows "timeouts" to * happen immediately and therefore speeds up tests. */ public class TestBufferPool implements NonBlockingPool, BlockingPool { + private final AtomicLong takeCount = new AtomicLong(0); + private final ConcurrentHashMap takenFromMap = new ConcurrentHashMap<>(); + private final Supplier> generator; private final int maxCount; - @GuardedBy("this") - private long numOutstanding; - private TestBufferPool(final Supplier> generator, final int maxCount) { this.generator = generator; @@ -60,7 +62,8 @@ public class TestBufferPool implements NonBlockingPool, BlockingPool public static TestBufferPool onHeap(final int bufferSize, final int maxCount) { return new TestBufferPool( - () -> new ReferenceCountingResourceHolder<>(ByteBuffer.allocate(bufferSize), () -> {}), + () -> new ReferenceCountingResourceHolder<>(ByteBuffer.allocate(bufferSize), () -> { + }), maxCount ); } @@ -102,20 +105,20 @@ public class TestBufferPool implements NonBlockingPool, BlockingPool public List> takeBatch(int elementNum) { synchronized (this) { - if (numOutstanding + elementNum <= maxCount) { + if (takenFromMap.size() + elementNum <= maxCount) { final List> retVal = new ArrayList<>(); try { for (int i = 0; i < elementNum; i++) { final ResourceHolder holder = generator.get(); final ByteBuffer o = holder.get(); + final long ticker = takeCount.getAndIncrement(); + takenFromMap.put(ticker, new RuntimeException()); + retVal.add(new ReferenceCountingResourceHolder<>(o, () -> { - synchronized (this) { - numOutstanding--; - holder.close(); - } + takenFromMap.remove(ticker); + holder.close(); })); - numOutstanding++; } } catch (Throwable e) { @@ -131,8 +134,11 @@ public class TestBufferPool implements NonBlockingPool, BlockingPool public long getOutstandingObjectCount() { - synchronized (this) { - return numOutstanding; - } + return takenFromMap.size(); + } + + public Collection getOutstandingExceptionsCreated() + { + return takenFromMap.values(); } } diff --git a/processing/src/test/java/org/apache/druid/query/groupby/TestGroupByBuffers.java b/processing/src/test/java/org/apache/druid/query/groupby/TestGroupByBuffers.java index f83160b1951..c25d66b668a 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/TestGroupByBuffers.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/TestGroupByBuffers.java @@ -85,7 +85,9 @@ public class TestGroupByBuffers implements Closeable } if (mergePool != null) { - Assert.assertEquals(0, mergePool.getOutstandingObjectCount()); + if (mergePool.getOutstandingObjectCount() != 0) { + throw mergePool.getOutstandingExceptionsCreated().iterator().next(); + } mergePool = null; } } diff --git a/server/src/main/java/org/apache/druid/server/QueryLifecycle.java b/server/src/main/java/org/apache/druid/server/QueryLifecycle.java index 1bdba451519..b4d80d01d93 100644 --- a/server/src/main/java/org/apache/druid/server/QueryLifecycle.java +++ b/server/src/main/java/org/apache/druid/server/QueryLifecycle.java @@ -141,7 +141,7 @@ public class QueryLifecycle * @return results */ @SuppressWarnings("unchecked") - public Sequence runSimple( + public QueryResponse runSimple( final Query query, final AuthenticationResult authenticationResult, final Access authorizationResult @@ -151,13 +151,14 @@ public class QueryLifecycle final Sequence results; + final QueryResponse queryResponse; try { preAuthorized(authenticationResult, authorizationResult); if (!authorizationResult.isAllowed()) { throw new ISE("Unauthorized"); } - final QueryResponse queryResponse = execute(); + queryResponse = execute(); results = queryResponse.getResults(); } catch (Throwable e) { @@ -165,16 +166,25 @@ public class QueryLifecycle throw e; } - return Sequences.wrap( - results, - new SequenceWrapper() - { - @Override - public void after(final boolean isDone, final Throwable thrown) - { - emitLogsAndMetrics(thrown, null, -1); - } - } + /* + * It seems extremely weird that the below code is wrapping the Sequence in order to emitLogsAndMetrics. + * The Sequence was returned by the call to execute, it would be worthwile to figure out why this wrapping + * cannot be moved into execute(). We leave this as an exercise for the future, however as this oddity + * was discovered while just trying to expose HTTP response headers + */ + return new QueryResponse( + Sequences.wrap( + results, + new SequenceWrapper() + { + @Override + public void after(final boolean isDone, final Throwable thrown) + { + emitLogsAndMetrics(thrown, null, -1); + } + } + ), + queryResponse.getResponseContext() ); } @@ -439,25 +449,4 @@ public class QueryLifecycle DONE } - public static class QueryResponse - { - private final Sequence results; - private final ResponseContext responseContext; - - private QueryResponse(final Sequence results, final ResponseContext responseContext) - { - this.results = results; - this.responseContext = responseContext; - } - - public Sequence getResults() - { - return results; - } - - public ResponseContext getResponseContext() - { - return responseContext; - } - } } diff --git a/server/src/main/java/org/apache/druid/server/QueryResource.java b/server/src/main/java/org/apache/druid/server/QueryResource.java index 590347735b8..ea225efbae1 100644 --- a/server/src/main/java/org/apache/druid/server/QueryResource.java +++ b/server/src/main/java/org/apache/druid/server/QueryResource.java @@ -200,7 +200,7 @@ public class QueryResource implements QueryCountStatsProvider throw new ForbiddenException(authResult.toString()); } - final QueryLifecycle.QueryResponse queryResponse = queryLifecycle.execute(); + final QueryResponse queryResponse = queryLifecycle.execute(); final Sequence results = queryResponse.getResults(); final ResponseContext responseContext = queryResponse.getResponseContext(); final String prevEtag = getPreviousEtag(req); @@ -255,41 +255,11 @@ public class QueryResource implements QueryCountStatsProvider ) .header(QUERY_ID_RESPONSE_HEADER, queryId); - transferEntityTag(responseContext, responseBuilder); - - DirectDruidClient.removeMagicResponseContextFields(responseContext); - - // Limit the response-context header, see https://github.com/apache/druid/issues/2331 - // Note that Response.ResponseBuilder.header(String key,Object value).build() calls value.toString() - // and encodes the string using ASCII, so 1 char is = 1 byte - final ResponseContext.SerializationResult serializationResult = responseContext.serializeWith( - jsonMapper, - responseContextConfig.getMaxResponseContextHeaderSize() + attachResponseContextToHttpResponse(queryId, responseContext, responseBuilder, jsonMapper, + responseContextConfig, selfNode ); - if (serializationResult.isTruncated()) { - final String logToPrint = StringUtils.format( - "Response Context truncated for id [%s]. Full context is [%s].", - queryId, - serializationResult.getFullResult() - ); - if (responseContextConfig.shouldFailOnTruncatedResponseContext()) { - log.error(logToPrint); - throw new QueryInterruptedException( - new TruncatedResponseContextException( - "Serialized response context exceeds the max size[%s]", - responseContextConfig.getMaxResponseContextHeaderSize() - ), - selfNode.getHostAndPortToUse() - ); - } else { - log.warn(logToPrint); - } - } - - return responseBuilder - .header(HEADER_RESPONSE_CONTEXT, serializationResult.getResult()) - .build(); + return responseBuilder.build(); } catch (QueryException e) { // make sure to close yielder if anything happened before starting to serialize the response. @@ -358,6 +328,48 @@ public class QueryResource implements QueryCountStatsProvider } } + public static void attachResponseContextToHttpResponse( + String queryId, + ResponseContext responseContext, + Response.ResponseBuilder responseBuilder, + ObjectMapper jsonMapper, ResponseContextConfig responseContextConfig, DruidNode selfNode + ) throws JsonProcessingException + { + transferEntityTag(responseContext, responseBuilder); + + DirectDruidClient.removeMagicResponseContextFields(responseContext); + + // Limit the response-context header, see https://github.com/apache/druid/issues/2331 + // Note that Response.ResponseBuilder.header(String key,Object value).build() calls value.toString() + // and encodes the string using ASCII, so 1 char is = 1 byte + final ResponseContext.SerializationResult serializationResult = responseContext.serializeWith( + jsonMapper, + responseContextConfig.getMaxResponseContextHeaderSize() + ); + + if (serializationResult.isTruncated()) { + final String logToPrint = StringUtils.format( + "Response Context truncated for id [%s]. Full context is [%s].", + queryId, + serializationResult.getFullResult() + ); + if (responseContextConfig.shouldFailOnTruncatedResponseContext()) { + log.error(logToPrint); + throw new QueryInterruptedException( + new TruncatedResponseContextException( + "Serialized response context exceeds the max size[%s]", + responseContextConfig.getMaxResponseContextHeaderSize() + ), + selfNode.getHostAndPortToUse() + ); + } else { + log.warn(logToPrint); + } + } + + responseBuilder.header(HEADER_RESPONSE_CONTEXT, serializationResult.getResult()); + } + private Query readQuery( final HttpServletRequest req, final InputStream in, diff --git a/server/src/main/java/org/apache/druid/server/QueryResponse.java b/server/src/main/java/org/apache/druid/server/QueryResponse.java new file mode 100644 index 00000000000..69908aee945 --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/QueryResponse.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server; + +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.query.context.ResponseContext; + +public class QueryResponse +{ + public static QueryResponse withEmptyContext(Sequence results) + { + return new QueryResponse(results, ResponseContext.createEmpty()); + } + + private final Sequence results; + private final ResponseContext responseContext; + + public QueryResponse(final Sequence results, final ResponseContext responseContext) + { + this.results = results; + this.responseContext = responseContext; + } + + public Sequence getResults() + { + return results; + } + + public ResponseContext getResponseContext() + { + return responseContext; + } +} diff --git a/server/src/main/java/org/apache/druid/server/initialization/AuthenticatorMapperModule.java b/server/src/main/java/org/apache/druid/server/initialization/AuthenticatorMapperModule.java index 56c17218e94..c2e240d8e99 100644 --- a/server/src/main/java/org/apache/druid/server/initialization/AuthenticatorMapperModule.java +++ b/server/src/main/java/org/apache/druid/server/initialization/AuthenticatorMapperModule.java @@ -23,12 +23,10 @@ import com.google.common.base.Supplier; import com.google.common.collect.Maps; import com.google.inject.Binder; import com.google.inject.Inject; -import com.google.inject.Injector; import com.google.inject.Provider; import org.apache.druid.guice.JsonConfigProvider; import org.apache.druid.guice.JsonConfigurator; import org.apache.druid.guice.LazySingleton; -import org.apache.druid.guice.LifecycleModule; import org.apache.druid.initialization.DruidModule; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; @@ -54,22 +52,18 @@ public class AuthenticatorMapperModule implements DruidModule binder.bind(AuthenticatorMapper.class) .toProvider(new AuthenticatorMapperProvider()) .in(LazySingleton.class); - - LifecycleModule.register(binder, AuthenticatorMapper.class); } private static class AuthenticatorMapperProvider implements Provider { private AuthConfig authConfig; - private Injector injector; private Properties props; private JsonConfigurator configurator; @Inject - public void inject(Injector injector, Properties props, JsonConfigurator configurator) + public void inject(AuthConfig authConfig, Properties props, JsonConfigurator configurator) { - this.authConfig = injector.getInstance(AuthConfig.class); - this.injector = injector; + this.authConfig = authConfig; this.props = props; this.configurator = configurator; } @@ -91,7 +85,10 @@ public class AuthenticatorMapperModule implements DruidModule } for (String authenticatorName : authenticators) { - final String authenticatorPropertyBase = StringUtils.format(AUTHENTICATOR_PROPERTIES_FORMAT_STRING, authenticatorName); + final String authenticatorPropertyBase = StringUtils.format( + AUTHENTICATOR_PROPERTIES_FORMAT_STRING, + authenticatorName + ); final JsonConfigProvider authenticatorProvider = JsonConfigProvider.of( authenticatorPropertyBase, Authenticator.class diff --git a/server/src/main/java/org/apache/druid/server/security/AuthenticatorMapper.java b/server/src/main/java/org/apache/druid/server/security/AuthenticatorMapper.java index 5b59fe65e46..4e88869135d 100644 --- a/server/src/main/java/org/apache/druid/server/security/AuthenticatorMapper.java +++ b/server/src/main/java/org/apache/druid/server/security/AuthenticatorMapper.java @@ -20,12 +20,10 @@ package org.apache.druid.server.security; import com.google.common.collect.Lists; -import org.apache.druid.guice.ManageLifecycle; import java.util.List; import java.util.Map; -@ManageLifecycle public class AuthenticatorMapper { private Map authenticatorMap; diff --git a/server/src/main/java/org/apache/druid/server/security/AuthorizerMapper.java b/server/src/main/java/org/apache/druid/server/security/AuthorizerMapper.java index e888cc14f9d..5ab2d1d2e10 100644 --- a/server/src/main/java/org/apache/druid/server/security/AuthorizerMapper.java +++ b/server/src/main/java/org/apache/druid/server/security/AuthorizerMapper.java @@ -19,11 +19,8 @@ package org.apache.druid.server.security; -import org.apache.druid.guice.ManageLifecycle; - import java.util.Map; -@ManageLifecycle public class AuthorizerMapper { private Map authorizerMap; diff --git a/sql/pom.xml b/sql/pom.xml index 846b44ef7f6..bbf2eb049ed 100644 --- a/sql/pom.xml +++ b/sql/pom.xml @@ -208,6 +208,11 @@ easymock test + + commons-io + commons-io + test + org.apache.druid druid-core diff --git a/sql/src/main/java/org/apache/druid/sql/DirectStatement.java b/sql/src/main/java/org/apache/druid/sql/DirectStatement.java index d73761c825a..e173213a0cc 100644 --- a/sql/src/main/java/org/apache/druid/sql/DirectStatement.java +++ b/sql/src/main/java/org/apache/druid/sql/DirectStatement.java @@ -23,9 +23,9 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.calcite.tools.ValidationException; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.query.QueryInterruptedException; +import org.apache.druid.server.QueryResponse; import org.apache.druid.server.security.ResourceAction; import org.apache.druid.sql.SqlLifecycleManager.Cancelable; import org.apache.druid.sql.calcite.planner.DruidPlanner; @@ -98,7 +98,7 @@ public class DirectStatement extends AbstractStatement implements Cancelable * Do the actual execute step which allows subclasses to wrap the sequence, * as is sometimes needed for testing. */ - public Sequence run() + public QueryResponse run() { try { // Check cancellation. Required for SqlResourceTest to work. @@ -176,7 +176,7 @@ public class DirectStatement extends AbstractStatement implements Cancelable * * @return sequence which delivers query results */ - public Sequence execute() + public QueryResponse execute() { return plan().run(); } diff --git a/sql/src/main/java/org/apache/druid/sql/SqlStatementFactory.java b/sql/src/main/java/org/apache/druid/sql/SqlStatementFactory.java index b1ecde6f4c6..c8450d62166 100644 --- a/sql/src/main/java/org/apache/druid/sql/SqlStatementFactory.java +++ b/sql/src/main/java/org/apache/druid/sql/SqlStatementFactory.java @@ -23,11 +23,41 @@ import org.apache.druid.sql.http.SqlQuery; import javax.servlet.http.HttpServletRequest; -public interface SqlStatementFactory +/** + * A class for the creation of Statements, which happen to be used for Sql. + */ +public class SqlStatementFactory { - HttpStatement httpStatement(SqlQuery sqlQuery, HttpServletRequest req); + private final SqlToolbox lifecycleToolbox; - DirectStatement directStatement(SqlQueryPlus sqlRequest); + /** + * The construction of these objects in the production code is a bit circuitous. Specifically, the SqlToolbox + * looks like it can be normally injected, except it actually expects to be mutated with a SqlEngine before being + * injected. This is generally accomplished with Guice, examples of which can be seen in the + * SqlStatementFactoryModule. + * + * @param lifecycleToolbox + */ + public SqlStatementFactory(SqlToolbox lifecycleToolbox) + { + this.lifecycleToolbox = lifecycleToolbox; + } - PreparedStatement preparedStatement(SqlQueryPlus sqlRequest); + public HttpStatement httpStatement( + final SqlQuery sqlQuery, + final HttpServletRequest req + ) + { + return new HttpStatement(lifecycleToolbox, sqlQuery, req); + } + + public DirectStatement directStatement(final SqlQueryPlus sqlRequest) + { + return new DirectStatement(lifecycleToolbox, sqlRequest); + } + + public PreparedStatement preparedStatement(final SqlQueryPlus sqlRequest) + { + return new PreparedStatement(lifecycleToolbox, sqlRequest); + } } diff --git a/sql/src/main/java/org/apache/druid/sql/SqlStatementFactoryFactory.java b/sql/src/main/java/org/apache/druid/sql/SqlStatementFactoryFactory.java deleted file mode 100644 index 5d32735cfe8..00000000000 --- a/sql/src/main/java/org/apache/druid/sql/SqlStatementFactoryFactory.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.sql; - -import com.google.common.base.Supplier; -import com.google.inject.Inject; -import org.apache.druid.guice.LazySingleton; -import org.apache.druid.java.util.emitter.service.ServiceEmitter; -import org.apache.druid.query.DefaultQueryConfig; -import org.apache.druid.server.QueryScheduler; -import org.apache.druid.server.log.RequestLogger; -import org.apache.druid.server.security.AuthConfig; -import org.apache.druid.sql.calcite.planner.PlannerFactory; -import org.apache.druid.sql.calcite.run.SqlEngine; -import org.apache.druid.sql.http.SqlQuery; - -import javax.servlet.http.HttpServletRequest; - -/** - * Factory factories: when design patterns go too far. - * - * Almost everything we need to create a {@link SqlStatementFactory} is injectable, except for the {@link SqlEngine}. - * So this class exists to produce {@link SqlStatementFactory} once the engine for a query is known. - */ -@LazySingleton -public class SqlStatementFactoryFactory -{ - protected final SqlToolbox lifecycleToolbox; - - @Inject - public SqlStatementFactoryFactory( - final PlannerFactory plannerFactory, - final ServiceEmitter emitter, - final RequestLogger requestLogger, - final QueryScheduler queryScheduler, - final AuthConfig authConfig, - final Supplier defaultQueryConfig, - final SqlLifecycleManager sqlLifecycleManager - ) - { - this.lifecycleToolbox = new SqlToolbox( - null, - plannerFactory, - emitter, - requestLogger, - queryScheduler, - authConfig, - defaultQueryConfig.get(), - sqlLifecycleManager - ); - } - - public SqlStatementFactory factorize(final SqlEngine engine) - { - return new FactoryImpl(lifecycleToolbox.withEngine(engine)); - } - - private static class FactoryImpl implements SqlStatementFactory - { - private final SqlToolbox lifecycleToolbox; - - public FactoryImpl(SqlToolbox lifecycleToolbox) - { - this.lifecycleToolbox = lifecycleToolbox; - } - - @Override - public HttpStatement httpStatement( - final SqlQuery sqlQuery, - final HttpServletRequest req - ) - { - return new HttpStatement(lifecycleToolbox, sqlQuery, req); - } - - @Override - public DirectStatement directStatement(final SqlQueryPlus sqlRequest) - { - return new DirectStatement(lifecycleToolbox, sqlRequest); - } - - @Override - public PreparedStatement preparedStatement(final SqlQueryPlus sqlRequest) - { - return new PreparedStatement(lifecycleToolbox, sqlRequest); - } - } -} diff --git a/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcResultSet.java b/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcResultSet.java index 36a69dd8154..95005b7bf59 100644 --- a/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcResultSet.java +++ b/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcResultSet.java @@ -103,7 +103,7 @@ public class DruidJdbcResultSet implements Closeable ensure(State.NEW); try { state = State.RUNNING; - final Sequence baseSequence = yielderOpenCloseExecutor.submit(stmt::execute).get(); + final Sequence baseSequence = yielderOpenCloseExecutor.submit(stmt::execute).get().getResults(); // We can't apply limits greater than Integer.MAX_VALUE, ignore them. final Sequence retSequence = diff --git a/sql/src/main/java/org/apache/druid/sql/avatica/DruidMeta.java b/sql/src/main/java/org/apache/druid/sql/avatica/DruidMeta.java index 89239a0beef..75e4d70c5b7 100644 --- a/sql/src/main/java/org/apache/druid/sql/avatica/DruidMeta.java +++ b/sql/src/main/java/org/apache/druid/sql/avatica/DruidMeta.java @@ -27,7 +27,6 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.inject.Inject; -import com.google.inject.Injector; import org.apache.calcite.avatica.AvaticaSeverity; import org.apache.calcite.avatica.MetaImpl; import org.apache.calcite.avatica.MissingResultsException; @@ -38,6 +37,7 @@ import org.apache.calcite.avatica.remote.AvaticaRuntimeException; import org.apache.calcite.avatica.remote.Service.ErrorResponse; import org.apache.calcite.avatica.remote.TypedValue; import org.apache.druid.guice.LazySingleton; +import org.apache.druid.guice.annotations.NativeQuery; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.UOE; @@ -49,10 +49,8 @@ import org.apache.druid.server.security.AuthenticatorMapper; import org.apache.druid.server.security.ForbiddenException; import org.apache.druid.sql.SqlQueryPlus; import org.apache.druid.sql.SqlStatementFactory; -import org.apache.druid.sql.SqlStatementFactoryFactory; import org.apache.druid.sql.calcite.planner.Calcites; import org.apache.druid.sql.calcite.planner.PlannerContext; -import org.apache.druid.sql.calcite.run.NativeSqlEngine; import org.joda.time.Interval; import javax.annotation.Nonnull; @@ -125,15 +123,14 @@ public class DruidMeta extends MetaImpl @Inject public DruidMeta( - final NativeSqlEngine engine, - final SqlStatementFactoryFactory sqlStatementFactoryFactory, + final @NativeQuery SqlStatementFactory sqlStatementFactory, final AvaticaServerConfig config, final ErrorHandler errorHandler, - final Injector injector + final AuthenticatorMapper authMapper ) { this( - sqlStatementFactoryFactory.factorize(engine), + sqlStatementFactory, config, errorHandler, Executors.newSingleThreadScheduledExecutor( @@ -142,7 +139,7 @@ public class DruidMeta extends MetaImpl .setDaemon(true) .build() ), - injector.getInstance(AuthenticatorMapper.class).getAuthenticatorChain() + authMapper.getAuthenticatorChain() ); } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java index 07f2a9dcb08..75be75855c2 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java @@ -68,12 +68,12 @@ import org.apache.druid.common.utils.IdUtils; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.guava.BaseSequence; -import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.query.Query; import org.apache.druid.segment.DimensionHandlerUtils; +import org.apache.druid.server.QueryResponse; import org.apache.druid.server.security.Access; import org.apache.druid.server.security.Action; import org.apache.druid.server.security.Resource; @@ -462,7 +462,7 @@ public class DruidPlanner implements Closeable } // Start the query. - final Supplier> resultsSupplier = () -> { + final Supplier resultsSupplier = () -> { // sanity check final Set readResourceActions = plannerContext.getResourceActions() @@ -536,38 +536,40 @@ public class DruidPlanner implements Closeable planner.getTypeFactory(), plannerContext.getParameters() ); - final Supplier> resultsSupplier = () -> { + final Supplier resultsSupplier = () -> { final Enumerable enumerable = theRel.bind(dataContext); final Enumerator enumerator = enumerable.enumerator(); - return Sequences.withBaggage(new BaseSequence<>( - new BaseSequence.IteratorMaker>() - { - @Override - public EnumeratorIterator make() - { - return new EnumeratorIterator<>(new Iterator() + return QueryResponse.withEmptyContext(Sequences.withBaggage( + new BaseSequence<>( + new BaseSequence.IteratorMaker>() { @Override - public boolean hasNext() + public EnumeratorIterator make() { - return enumerator.moveNext(); + return new EnumeratorIterator<>(new Iterator() + { + @Override + public boolean hasNext() + { + return enumerator.moveNext(); + } + + @Override + public Object[] next() + { + return (Object[]) enumerator.current(); + } + }); } @Override - public Object[] next() + public void cleanup(EnumeratorIterator iterFromMake) { - return (Object[]) enumerator.current(); + } - }); - } - - @Override - public void cleanup(EnumeratorIterator iterFromMake) - { - - } - } - ), enumerator::close); + } + ), enumerator::close) + ); }; return new PlannerResult(resultsSupplier, root.validatedRowType); } @@ -606,8 +608,9 @@ public class DruidPlanner implements Closeable log.error(jpe, "Encountered exception while serializing Resources for explain output"); resourcesString = null; } - final Supplier> resultsSupplier = Suppliers.ofInstance( - Sequences.simple(ImmutableList.of(new Object[]{explanation, resourcesString}))); + final Supplier resultsSupplier = Suppliers.ofInstance( + QueryResponse.withEmptyContext(Sequences.simple(ImmutableList.of(new Object[]{explanation, resourcesString}))) + ); return new PlannerResult(resultsSupplier, getExplainStructType(rel.getCluster().getTypeFactory())); } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerResult.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerResult.java index 619f6c5509c..57e3ba2cbc9 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerResult.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerResult.java @@ -22,7 +22,7 @@ package org.apache.druid.sql.calcite.planner; import com.google.common.base.Supplier; import org.apache.calcite.rel.type.RelDataType; import org.apache.druid.java.util.common.ISE; -import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.server.QueryResponse; import java.util.concurrent.atomic.AtomicBoolean; @@ -32,12 +32,12 @@ import java.util.concurrent.atomic.AtomicBoolean; */ public class PlannerResult { - private final Supplier> resultsSupplier; + private final Supplier resultsSupplier; private final RelDataType rowType; private final AtomicBoolean didRun = new AtomicBoolean(); public PlannerResult( - final Supplier> resultsSupplier, + final Supplier resultsSupplier, final RelDataType rowType ) { @@ -53,7 +53,7 @@ public class PlannerResult /** * Run the query */ - public Sequence run() + public QueryResponse run() { if (!didRun.compareAndSet(false, true)) { // Safety check. diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidRel.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidRel.java index 6f601ec5aa5..9043577a7da 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidRel.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidRel.java @@ -22,7 +22,7 @@ package org.apache.druid.sql.calcite.rel; import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rel.AbstractRelNode; -import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.server.QueryResponse; import org.apache.druid.sql.calcite.planner.PlannerContext; import javax.annotation.Nullable; @@ -45,7 +45,7 @@ public abstract class DruidRel extends AbstractRelNode @Nullable public abstract PartialDruidQuery getPartialDruidQuery(); - public Sequence runQuery() + public QueryResponse runQuery() { // runQuery doesn't need to finalize aggregations, because the fact that runQuery is happening suggests this // is the outermost query, and it will actually get run as a native query. Druid's native query layer will diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidUnionRel.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidUnionRel.java index 25e6e9f5232..de1bc8b758e 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidUnionRel.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidUnionRel.java @@ -21,6 +21,7 @@ package org.apache.druid.sql.calcite.rel; import com.google.common.base.Preconditions; import com.google.common.collect.FluentIterable; +import com.google.common.collect.Iterables; import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelOptCost; import org.apache.calcite.plan.RelOptPlanner; @@ -30,14 +31,19 @@ import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.RelWriter; import org.apache.calcite.rel.metadata.RelMetadataQuery; import org.apache.calcite.rel.type.RelDataType; +import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.query.UnionDataSource; +import org.apache.druid.query.context.ResponseContext; +import org.apache.druid.server.QueryResponse; import org.apache.druid.sql.calcite.planner.PlannerContext; +import org.joda.time.Interval; import javax.annotation.Nullable; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Set; import java.util.stream.Collectors; @@ -47,7 +53,7 @@ import java.util.stream.Collectors; * but rather, it represents the concatenation of a series of native queries in the SQL layer. Therefore, * {@link #getPartialDruidQuery()} returns null, and this rel cannot be built on top of. It must be the outer rel in a * query plan. - * + *

* See {@link DruidUnionDataSourceRel} for a version that does a regular Druid query using a {@link UnionDataSource}. * In the future we expect that {@link UnionDataSource} will gain the ability to union query datasources together, and * then this rel could be replaced by {@link DruidUnionDataSourceRel}. @@ -100,18 +106,51 @@ public class DruidUnionRel extends DruidRel } @Override - @SuppressWarnings("unchecked") - public Sequence runQuery() + @SuppressWarnings({"unchecked", "rawtypes"}) + public QueryResponse runQuery() { // Lazy: run each query in sequence, not all at once. if (limit == 0) { - return Sequences.empty(); + return new QueryResponse(Sequences.empty(), ResponseContext.createEmpty()); } else { - final Sequence baseSequence = Sequences.concat( - FluentIterable.from(rels).transform(rel -> ((DruidRel) rel).runQuery()) - ); - return limit > 0 ? baseSequence.limit(limit) : baseSequence; + // We run the first rel here for two reasons: + // 1) So that we get things running as normally expected when runQuery() is called + // 2) So that we have a QueryResponse to return, note that the response headers from the query will only + // have values from this first query and will not contain values from subsequent queries. This is definitely + // sub-optimal, the other option would be to fire off all queries and combine their QueryResponses, but that + // is also sub-optimal as it would consume parallel query resources and potentially starve the system. + // Instead, we only return the headers from the first query and potentially exception out and fail the query + // if there are any response headers that come from subsequent queries that are correctness concerns + final QueryResponse queryResponse = ((DruidRel) rels.get(0)).runQuery(); + + final List> firstAsList = Collections.singletonList(queryResponse.getResults()); + final Iterable> theRestTransformed = FluentIterable + .from(rels.subList(1, rels.size())) + .transform( + rel -> { + final QueryResponse response = ((DruidRel) rel).runQuery(); + + final ResponseContext nextContext = response.getResponseContext(); + final List uncoveredIntervals = nextContext.getUncoveredIntervals(); + if (uncoveredIntervals == null || uncoveredIntervals.isEmpty()) { + return response.getResults(); + } else { + throw new ISE( + "uncoveredIntervals[%s] existed on a sub-query of a union, incomplete data, failing", + uncoveredIntervals + ); + } + } + ); + + final Iterable> recombinedSequences = Iterables.concat(firstAsList, theRestTransformed); + + final Sequence returnSequence = Sequences.concat(recombinedSequences); + return new QueryResponse( + limit > 0 ? returnSequence.limit(limit) : returnSequence, + queryResponse.getResponseContext() + ); } } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeQueryMaker.java b/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeQueryMaker.java index 1a78705ab7b..f045769ec93 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeQueryMaker.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeQueryMaker.java @@ -54,6 +54,7 @@ import org.apache.druid.segment.data.ComparableList; import org.apache.druid.segment.data.ComparableStringArray; import org.apache.druid.server.QueryLifecycle; import org.apache.druid.server.QueryLifecycleFactory; +import org.apache.druid.server.QueryResponse; import org.apache.druid.server.security.Access; import org.apache.druid.server.security.AuthenticationResult; import org.apache.druid.sql.calcite.planner.Calcites; @@ -93,7 +94,7 @@ public class NativeQueryMaker implements QueryMaker } @Override - public Sequence runQuery(final DruidQuery druidQuery) + public QueryResponse runQuery(final DruidQuery druidQuery) { final Query query = druidQuery.getQuery(); @@ -172,7 +173,7 @@ public class NativeQueryMaker implements QueryMaker .orElseGet(query::getIntervals); } - private Sequence execute(Query query, final List newFields, final List newTypes) + private QueryResponse execute(Query query, final List newFields, final List newTypes) { Hook.QUERY_PLAN.run(query); @@ -194,23 +195,22 @@ public class NativeQueryMaker implements QueryMaker // otherwise it won't yet be initialized. (A bummer, since ideally, we'd verify the toolChest exists and can do // array-based results before starting the query; but in practice we don't expect this to happen since we keep // tight control over which query types we generate in the SQL layer. They all support array-based results.) - final Sequence results = queryLifecycle.runSimple(query, authenticationResult, authorizationResult); + final QueryResponse results = queryLifecycle.runSimple(query, authenticationResult, authorizationResult); - //noinspection unchecked - final QueryToolChest> toolChest = queryLifecycle.getToolChest(); - final List resultArrayFields = toolChest.resultArraySignature(query).getColumnNames(); - final Sequence resultArrays = toolChest.resultsAsArrays(query, results); - return mapResultSequence(resultArrays, resultArrayFields, newFields, newTypes); + return mapResultSequence(results, queryLifecycle.getToolChest(), query, newFields, newTypes); } - private Sequence mapResultSequence( - final Sequence sequence, - final List originalFields, + private QueryResponse mapResultSequence( + final QueryResponse results, + final QueryToolChest> toolChest, + final Query query, final List newFields, final List newTypes ) { + final List originalFields = toolChest.resultArraySignature(query).getColumnNames(); + // Build hash map for looking up original field positions, in case the number of fields is super high. final Object2IntMap originalFieldsLookup = new Object2IntOpenHashMap<>(); originalFieldsLookup.defaultReturnValue(-1); @@ -234,15 +234,20 @@ public class NativeQueryMaker implements QueryMaker mapping[i] = idx; } - return Sequences.map( - sequence, - array -> { - final Object[] newArray = new Object[mapping.length]; - for (int i = 0; i < mapping.length; i++) { - newArray[i] = coerce(array[mapping[i]], newTypes.get(i)); - } - return newArray; - } + //noinspection unchecked + final Sequence sequence = toolChest.resultsAsArrays(query, results.getResults()); + return new QueryResponse( + Sequences.map( + sequence, + array -> { + final Object[] newArray = new Object[mapping.length]; + for (int i = 0; i < mapping.length; i++) { + newArray[i] = coerce(array[mapping[i]], newTypes.get(i)); + } + return newArray; + } + ), + results.getResponseContext() ); } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/run/QueryMaker.java b/sql/src/main/java/org/apache/druid/sql/calcite/run/QueryMaker.java index 72dae85d0eb..8acc02230cf 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/run/QueryMaker.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/run/QueryMaker.java @@ -19,7 +19,7 @@ package org.apache.druid.sql.calcite.run; -import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.server.QueryResponse; import org.apache.druid.sql.calcite.rel.DruidQuery; /** @@ -33,5 +33,5 @@ public interface QueryMaker * created for. The returned arrays match the row type given by {@link SqlEngine#resultTypeForSelect} or * {@link SqlEngine#resultTypeForInsert}, depending on the nature of the statement. */ - Sequence runQuery(DruidQuery druidQuery); + QueryResponse runQuery(DruidQuery druidQuery); } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCache.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCache.java index 427108d9d83..5d6d386a61b 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCache.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCache.java @@ -66,7 +66,6 @@ import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; import javax.annotation.Nullable; - import java.io.IOException; import java.util.Comparator; import java.util.EnumSet; @@ -907,7 +906,7 @@ public class SegmentMetadataCache return queryLifecycleFactory .factorize() - .runSimple(segmentMetadataQuery, escalator.createEscalatedAuthenticationResult(), Access.OK); + .runSimple(segmentMetadataQuery, escalator.createEscalatedAuthenticationResult(), Access.OK).getResults(); } @VisibleForTesting diff --git a/sql/src/main/java/org/apache/druid/sql/guice/SqlModule.java b/sql/src/main/java/org/apache/druid/sql/guice/SqlModule.java index 589ed2575d4..614ed62339d 100644 --- a/sql/src/main/java/org/apache/druid/sql/guice/SqlModule.java +++ b/sql/src/main/java/org/apache/druid/sql/guice/SqlModule.java @@ -19,18 +19,30 @@ package org.apache.druid.sql.guice; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; import com.google.inject.Binder; import com.google.inject.Inject; import com.google.inject.Key; import com.google.inject.Module; +import com.google.inject.Provides; import org.apache.druid.guice.LazySingleton; import org.apache.druid.guice.PolyBind; +import org.apache.druid.guice.annotations.NativeQuery; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.query.DefaultQueryConfig; +import org.apache.druid.server.QueryScheduler; +import org.apache.druid.server.log.RequestLogger; +import org.apache.druid.server.security.AuthConfig; +import org.apache.druid.sql.SqlLifecycleManager; +import org.apache.druid.sql.SqlStatementFactory; +import org.apache.druid.sql.SqlToolbox; import org.apache.druid.sql.avatica.AvaticaModule; import org.apache.druid.sql.calcite.aggregation.SqlAggregationModule; import org.apache.druid.sql.calcite.expression.builtin.QueryLookupOperatorConversion; import org.apache.druid.sql.calcite.planner.CalcitePlannerModule; +import org.apache.druid.sql.calcite.planner.PlannerFactory; +import org.apache.druid.sql.calcite.run.NativeSqlEngine; import org.apache.druid.sql.calcite.schema.DruidCalciteSchemaModule; import org.apache.druid.sql.calcite.schema.DruidSchemaManager; import org.apache.druid.sql.calcite.schema.NoopDruidSchemaManager; @@ -50,17 +62,10 @@ public class SqlModule implements Module public static final String PROPERTY_SQL_SCHEMA_MANAGER_TYPE = "druid.sql.schemamanager.type"; public static final String PROPERTY_SQL_APPROX_COUNT_DISTINCT_CHOICE = "druid.sql.approxCountDistinct.function"; - @Inject private Properties props; - public SqlModule() - { - } - - @VisibleForTesting - public SqlModule( - Properties props - ) + @Inject + public void setProps(Properties props) { this.props = props; } @@ -101,6 +106,8 @@ public class SqlModule implements Module binder.install(new SqlAggregationModule()); binder.install(new DruidViewModule()); + binder.install(new SqlStatementFactoryModule()); + // QueryLookupOperatorConversion isn't in DruidOperatorTable since it needs a LookupExtractorFactoryContainerProvider injected. SqlBindings.addOperatorConversion(binder, QueryLookupOperatorConversion.class); @@ -130,4 +137,59 @@ public class SqlModule implements Module Preconditions.checkNotNull(props, "props"); return Boolean.valueOf(props.getProperty(PROPERTY_SQL_ENABLE_AVATICA, "true")); } + + /** + * We create a new class for this module so that it can be shared by tests. The structuring of the SqlModule + * at time of writing was not conducive to reuse in test code, so, instead of fixing that we just take the easy + * way out of adding the test-reusable code to this module and reuse that. + * + * Generally speaking, the injection pattern done by this module is a bit circuitous. The `SqlToolbox` acts as + * if it can be injected with all of its dependencies, but also expects to be mutated with a new SqlEngine. We + * should likely look at adjusting the object dependencies to actually depend on the SqlToolbox and create + * different Toolboxes for the different way that queries are done. But, for now, I'm not changing the interfaces. + */ + public static class SqlStatementFactoryModule implements Module + { + + @Provides + @LazySingleton + public SqlToolbox makeSqlToolbox( + final PlannerFactory plannerFactory, + final ServiceEmitter emitter, + final RequestLogger requestLogger, + final QueryScheduler queryScheduler, + final AuthConfig authConfig, + final Supplier defaultQueryConfig, + final SqlLifecycleManager sqlLifecycleManager + ) + { + return new SqlToolbox( + null, + plannerFactory, + emitter, + requestLogger, + queryScheduler, + authConfig, + defaultQueryConfig.get(), + sqlLifecycleManager + ); + } + + @Provides + @NativeQuery + @LazySingleton + public SqlStatementFactory makeNativeSqlStatementFactory( + final NativeSqlEngine sqlEngine, + SqlToolbox toolbox + ) + { + return new SqlStatementFactory(toolbox.withEngine(sqlEngine)); + } + + @Override + public void configure(Binder binder) + { + // Do nothing, this class exists for the Provider methods + } + } } diff --git a/sql/src/main/java/org/apache/druid/sql/http/SqlHttpModule.java b/sql/src/main/java/org/apache/druid/sql/http/SqlHttpModule.java index 4d7a5d26bb8..1fc64ccded0 100644 --- a/sql/src/main/java/org/apache/druid/sql/http/SqlHttpModule.java +++ b/sql/src/main/java/org/apache/druid/sql/http/SqlHttpModule.java @@ -22,6 +22,7 @@ package org.apache.druid.sql.http; import com.google.inject.Binder; import com.google.inject.Module; import org.apache.druid.guice.Jerseys; +import org.apache.druid.guice.LazySingleton; /** * The Module responsible for providing bindings to the SQL http endpoint @@ -31,6 +32,7 @@ public class SqlHttpModule implements Module @Override public void configure(Binder binder) { + binder.bind(SqlResource.class).in(LazySingleton.class); Jerseys.addResource(binder, SqlResource.class); } } diff --git a/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java b/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java index 28623a622e6..db7f7d4ae73 100644 --- a/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java +++ b/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java @@ -21,15 +21,14 @@ package org.apache.druid.sql.http; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.io.CountingOutputStream; import com.google.inject.Inject; import org.apache.calcite.plan.RelOptPlanner; import org.apache.druid.common.exception.SanitizableException; -import org.apache.druid.guice.annotations.Json; +import org.apache.druid.guice.annotations.NativeQuery; +import org.apache.druid.guice.annotations.Self; import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Yielder; import org.apache.druid.java.util.common.guava.Yielders; import org.apache.druid.java.util.common.logger.Logger; @@ -38,6 +37,10 @@ import org.apache.druid.query.QueryCapacityExceededException; import org.apache.druid.query.QueryInterruptedException; import org.apache.druid.query.QueryTimeoutException; import org.apache.druid.query.QueryUnsupportedException; +import org.apache.druid.server.DruidNode; +import org.apache.druid.server.QueryResource; +import org.apache.druid.server.QueryResponse; +import org.apache.druid.server.ResponseContextConfig; import org.apache.druid.server.initialization.ServerConfig; import org.apache.druid.server.security.Access; import org.apache.druid.server.security.AuthorizationUtils; @@ -52,8 +55,6 @@ import org.apache.druid.sql.SqlLifecycleManager.Cancelable; import org.apache.druid.sql.SqlPlanningException; import org.apache.druid.sql.SqlRowTransformer; import org.apache.druid.sql.SqlStatementFactory; -import org.apache.druid.sql.SqlStatementFactoryFactory; -import org.apache.druid.sql.calcite.run.NativeSqlEngine; import javax.annotation.Nullable; import javax.servlet.http.HttpServletRequest; @@ -63,13 +64,14 @@ import javax.ws.rs.POST; import javax.ws.rs.Path; import javax.ws.rs.PathParam; import javax.ws.rs.Produces; +import javax.ws.rs.WebApplicationException; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; import javax.ws.rs.core.StreamingOutput; - import java.io.IOException; +import java.io.OutputStream; import java.util.List; import java.util.Set; import java.util.stream.Collectors; @@ -87,33 +89,18 @@ public class SqlResource private final SqlStatementFactory sqlStatementFactory; private final SqlLifecycleManager sqlLifecycleManager; private final ServerConfig serverConfig; + private final ResponseContextConfig responseContextConfig; + private final DruidNode selfNode; @Inject - public SqlResource( - @Json ObjectMapper jsonMapper, - AuthorizerMapper authorizerMapper, - NativeSqlEngine engine, - SqlStatementFactoryFactory sqlStatementFactoryFactory, - SqlLifecycleManager sqlLifecycleManager, - ServerConfig serverConfig - ) - { - this( - jsonMapper, - authorizerMapper, - sqlStatementFactoryFactory.factorize(engine), - sqlLifecycleManager, - serverConfig - ); - } - - @VisibleForTesting SqlResource( final ObjectMapper jsonMapper, final AuthorizerMapper authorizerMapper, - final SqlStatementFactory sqlStatementFactory, + final @NativeQuery SqlStatementFactory sqlStatementFactory, final SqlLifecycleManager sqlLifecycleManager, - final ServerConfig serverConfig + final ServerConfig serverConfig, + ResponseContextConfig responseContextConfig, + @Self DruidNode selfNode ) { this.jsonMapper = Preconditions.checkNotNull(jsonMapper, "jsonMapper"); @@ -121,6 +108,9 @@ public class SqlResource this.sqlStatementFactory = Preconditions.checkNotNull(sqlStatementFactory, "sqlStatementFactory"); this.sqlLifecycleManager = Preconditions.checkNotNull(sqlLifecycleManager, "sqlLifecycleManager"); this.serverConfig = Preconditions.checkNotNull(serverConfig, "serverConfig"); + this.responseContextConfig = responseContextConfig; + this.selfNode = selfNode; + } @POST @@ -138,17 +128,20 @@ public class SqlResource try { Thread.currentThread().setName(StringUtils.format("sql[%s]", sqlQueryId)); ResultSet resultSet = stmt.plan(); - final Sequence sequence = resultSet.run(); + final QueryResponse response = resultSet.run(); final SqlRowTransformer rowTransformer = resultSet.createRowTransformer(); - final Yielder yielder0 = Yielders.each(sequence); + final Yielder finalYielder = Yielders.each(response.getResults()); - try { - final Response.ResponseBuilder responseBuilder = Response - .ok( - (StreamingOutput) outputStream -> { + final Response.ResponseBuilder responseBuilder = Response + .ok( + new StreamingOutput() + { + @Override + public void write(OutputStream output) throws IOException, WebApplicationException + { Exception e = null; - CountingOutputStream os = new CountingOutputStream(outputStream); - Yielder yielder = yielder0; + CountingOutputStream os = new CountingOutputStream(output); + Yielder yielder = finalYielder; try (final ResultFormat.Writer writer = sqlQuery.getResultFormat() .createFormatter(os, jsonMapper)) { @@ -185,20 +178,24 @@ public class SqlResource endLifecycle(stmt, e, os.getCount()); } } - ) - .header(SQL_QUERY_ID_RESPONSE_HEADER, sqlQueryId); + } + ) + .header(SQL_QUERY_ID_RESPONSE_HEADER, sqlQueryId); - if (sqlQuery.includeHeader()) { - responseBuilder.header(SQL_HEADER_RESPONSE_HEADER, SQL_HEADER_VALUE); - } + if (sqlQuery.includeHeader()) { + responseBuilder.header(SQL_HEADER_RESPONSE_HEADER, SQL_HEADER_VALUE); + } - return responseBuilder.build(); - } - catch (Throwable e) { - // make sure to close yielder if anything happened before starting to serialize the response. - yielder0.close(); - throw new RuntimeException(e); - } + QueryResource.attachResponseContextToHttpResponse( + sqlQueryId, + response.getResponseContext(), + responseBuilder, + jsonMapper, + responseContextConfig, + selfNode + ); + + return responseBuilder.build(); } catch (QueryCapacityExceededException cap) { endLifecycle(stmt, cap, -1); diff --git a/sql/src/test/java/org/apache/druid/sql/SqlStatementTest.java b/sql/src/test/java/org/apache/druid/sql/SqlStatementTest.java index 2004fe9988b..0653c533a14 100644 --- a/sql/src/test/java/org/apache/druid/sql/SqlStatementTest.java +++ b/sql/src/test/java/org/apache/druid/sql/SqlStatementTest.java @@ -19,7 +19,6 @@ package org.apache.druid.sql; -import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.util.concurrent.ListeningExecutorService; @@ -70,7 +69,6 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; import javax.servlet.http.HttpServletRequest; - import java.io.IOException; import java.util.Collections; import java.util.List; @@ -159,15 +157,18 @@ public class SqlStatementTest new CalciteRulesManager(ImmutableSet.of()) ); - this.sqlStatementFactory = new SqlStatementFactoryFactory( - plannerFactory, - new NoopServiceEmitter(), - testRequestLogger, - QueryStackTests.DEFAULT_NOOP_SCHEDULER, - new AuthConfig(), - Suppliers.ofInstance(defaultQueryConfig), - new SqlLifecycleManager() - ).factorize(CalciteTests.createMockSqlEngine(walker, conglomerate)); + this.sqlStatementFactory = new SqlStatementFactory( + new SqlToolbox( + CalciteTests.createMockSqlEngine(walker, conglomerate), + plannerFactory, + new NoopServiceEmitter(), + testRequestLogger, + QueryStackTests.DEFAULT_NOOP_SCHEDULER, + new AuthConfig(), + defaultQueryConfig, + new SqlLifecycleManager() + ) + ); } @After @@ -221,7 +222,7 @@ public class SqlStatementTest DirectStatement stmt = sqlStatementFactory.directStatement(sqlReq); ResultSet resultSet = stmt.plan(); assertTrue(resultSet.runnable()); - List results = resultSet.run().toList(); + List results = resultSet.run().getResults().toList(); assertEquals(1, results.size()); assertEquals(6L, results.get(0)[0]); assertEquals("foo", results.get(0)[1]); @@ -341,7 +342,7 @@ public class SqlStatementTest makeQuery("SELECT COUNT(*) AS cnt, 'foo' AS TheFoo FROM druid.foo"), request(true) ); - List results = stmt.execute().toList(); + List results = stmt.execute().getResults().toList(); assertEquals(1, results.size()); assertEquals(6L, results.get(0)[0]); assertEquals("foo", results.get(0)[1]); @@ -422,6 +423,7 @@ public class SqlStatementTest List results = stmt .execute(Collections.emptyList()) .execute() + .getResults() .toList(); assertEquals(1, results.size()); assertEquals(6L, results.get(0)[0]); diff --git a/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java b/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java index 03763c4b5e8..7d989cae230 100644 --- a/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java +++ b/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java @@ -72,8 +72,6 @@ import org.apache.druid.sql.calcite.planner.Calcites; import org.apache.druid.sql.calcite.planner.DruidOperatorTable; import org.apache.druid.sql.calcite.planner.PlannerConfig; import org.apache.druid.sql.calcite.planner.PlannerFactory; -import org.apache.druid.sql.calcite.run.NativeSqlEngine; -import org.apache.druid.sql.calcite.run.SqlEngine; import org.apache.druid.sql.calcite.schema.DruidSchemaCatalog; import org.apache.druid.sql.calcite.schema.DruidSchemaName; import org.apache.druid.sql.calcite.schema.NamedSchema; @@ -81,6 +79,7 @@ import org.apache.druid.sql.calcite.util.CalciteTestBase; import org.apache.druid.sql.calcite.util.CalciteTests; import org.apache.druid.sql.calcite.util.QueryLogHook; import org.apache.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker; +import org.apache.druid.sql.guice.SqlModule; import org.eclipse.jetty.server.Server; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; @@ -221,7 +220,7 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase binder.bind(QueryScheduler.class) .toProvider(QuerySchedulerProvider.class) .in(LazySingleton.class); - binder.bind(SqlEngine.class).to(NativeSqlEngine.class); + binder.install(new SqlModule.SqlStatementFactoryModule()); binder.bind(new TypeLiteral>(){}).toInstance(Suppliers.ofInstance(new DefaultQueryConfig(ImmutableMap.of()))); binder.bind(CalciteRulesManager.class).toInstance(new CalciteRulesManager(ImmutableSet.of())); } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java index aea6a094d3e..91719059609 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java @@ -917,7 +917,7 @@ public class BaseCalciteQueryTest extends CalciteTestBase .auth(authenticationResult) .build() ); - Sequence results = stmt.execute(); + Sequence results = stmt.execute().getResults(); RelDataType rowType = stmt.prepareResult().getReturnedRowType(); return new Pair<>( RowSignatures.fromRelDataType(rowType.getFieldNames(), rowType), diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java index 2bd1dc2a8aa..d614c23d5d2 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java @@ -4559,7 +4559,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest QueryLifecycleFactory qlf = CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate); QueryLifecycle ql = qlf.factorize(); - Sequence seq = ql.runSimple(query, CalciteTests.SUPER_USER_AUTH_RESULT, Access.OK); + Sequence seq = ql.runSimple(query, CalciteTests.SUPER_USER_AUTH_RESULT, Access.OK).getResults(); List results = seq.toList(); Assert.assertEquals( ImmutableList.of(ResultRow.of("def")), diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/SqlVectorizedExpressionSanityTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/SqlVectorizedExpressionSanityTest.java index 3823bd4af7b..f49bb0c40ea 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/SqlVectorizedExpressionSanityTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/SqlVectorizedExpressionSanityTest.java @@ -200,8 +200,8 @@ public class SqlVectorizedExpressionSanityTest extends InitializedNullHandlingTe ) { final PlannerResult vectorPlan = vectorPlanner.plan(); final PlannerResult nonVectorPlan = nonVectorPlanner.plan(); - final Sequence vectorSequence = vectorPlan.run(); - final Sequence nonVectorSequence = nonVectorPlan.run(); + final Sequence vectorSequence = vectorPlan.run().getResults(); + final Sequence nonVectorSequence = nonVectorPlan.run().getResults(); Yielder vectorizedYielder = Yielders.each(vectorSequence); Yielder nonVectorizedYielder = Yielders.each(nonVectorSequence); int row = 0; diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/TestInsertQueryMaker.java b/sql/src/test/java/org/apache/druid/sql/calcite/TestInsertQueryMaker.java index 2576149d76d..22c004947ab 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/TestInsertQueryMaker.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/TestInsertQueryMaker.java @@ -21,9 +21,9 @@ package org.apache.druid.sql.calcite; import com.google.common.collect.ImmutableList; import org.apache.calcite.runtime.Hook; -import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.server.QueryResponse; import org.apache.druid.sql.calcite.rel.DruidQuery; import org.apache.druid.sql.calcite.run.QueryMaker; @@ -45,7 +45,7 @@ public class TestInsertQueryMaker implements QueryMaker } @Override - public Sequence runQuery(final DruidQuery druidQuery) + public QueryResponse runQuery(final DruidQuery druidQuery) { // Don't actually execute anything, but do record information that tests will check for. @@ -53,6 +53,8 @@ public class TestInsertQueryMaker implements QueryMaker Hook.QUERY_PLAN.run(druidQuery.getQuery()); // 2) Return the dataSource and signature of the insert operation, so tests can confirm they are correct. - return Sequences.simple(ImmutableList.of(new Object[]{targetDataSource, signature})); + return QueryResponse.withEmptyContext( + Sequences.simple(ImmutableList.of(new Object[]{targetDataSource, signature})) + ); } } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCacheTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCacheTest.java index 99caee81253..1ba95b8bef0 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCacheTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCacheTest.java @@ -32,6 +32,7 @@ import org.apache.druid.client.BrokerInternalQueryConfig; import org.apache.druid.client.ImmutableDruidServer; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.query.GlobalTableDataSource; import org.apache.druid.query.TableDataSource; import org.apache.druid.query.aggregation.CountAggregatorFactory; @@ -53,6 +54,7 @@ import org.apache.druid.segment.join.MapJoinableFactory; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; import org.apache.druid.server.QueryLifecycle; import org.apache.druid.server.QueryLifecycleFactory; +import org.apache.druid.server.QueryResponse; import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.server.coordination.ServerType; import org.apache.druid.server.security.Access; @@ -1089,7 +1091,7 @@ public class SegmentMetadataCacheTest extends SegmentMetadataCacheCommon EasyMock.expect(factoryMock.factorize()).andReturn(lifecycleMock).once(); // This is the mat of the test, making sure that the query created by the method under test matches the expected query, specifically the operator configured context EasyMock.expect(lifecycleMock.runSimple(expectedMetadataQuery, AllowAllAuthenticator.ALLOW_ALL_RESULT, Access.OK)) - .andReturn(null); + .andReturn(QueryResponse.withEmptyContext(Sequences.empty())); EasyMock.replay(factoryMock, lifecycleMock); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java index 6b051e8109c..adc132d736f 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java @@ -114,7 +114,7 @@ import org.apache.druid.server.security.NoopEscalator; import org.apache.druid.server.security.ResourceType; import org.apache.druid.sql.SqlLifecycleManager; import org.apache.druid.sql.SqlStatementFactory; -import org.apache.druid.sql.SqlStatementFactoryFactory; +import org.apache.druid.sql.SqlToolbox; import org.apache.druid.sql.calcite.planner.DruidOperatorTable; import org.apache.druid.sql.calcite.planner.PlannerConfig; import org.apache.druid.sql.calcite.planner.PlannerFactory; @@ -804,15 +804,18 @@ public class CalciteTests final AuthConfig authConfig ) { - return new SqlStatementFactoryFactory( - plannerFactory, - new ServiceEmitter("dummy", "dummy", new NoopEmitter()), - new NoopRequestLogger(), - QueryStackTests.DEFAULT_NOOP_SCHEDULER, - authConfig, - Suppliers.ofInstance(new DefaultQueryConfig(ImmutableMap.of())), - new SqlLifecycleManager() - ).factorize(engine); + return new SqlStatementFactory( + new SqlToolbox( + engine, + plannerFactory, + new ServiceEmitter("dummy", "dummy", new NoopEmitter()), + new NoopRequestLogger(), + QueryStackTests.DEFAULT_NOOP_SCHEDULER, + authConfig, + new DefaultQueryConfig(ImmutableMap.of()), + new SqlLifecycleManager() + ) + ); } public static ObjectMapper getJsonMapper() diff --git a/sql/src/test/java/org/apache/druid/sql/guice/SqlModuleTest.java b/sql/src/test/java/org/apache/druid/sql/guice/SqlModuleTest.java index 4b87c207f0e..12b8d5f8a91 100644 --- a/sql/src/test/java/org/apache/druid/sql/guice/SqlModuleTest.java +++ b/sql/src/test/java/org/apache/druid/sql/guice/SqlModuleTest.java @@ -53,6 +53,8 @@ import org.apache.druid.segment.join.JoinableFactory; import org.apache.druid.segment.loading.SegmentLoader; import org.apache.druid.server.QueryScheduler; import org.apache.druid.server.QuerySchedulerProvider; +import org.apache.druid.server.ResponseContextConfig; +import org.apache.druid.server.initialization.AuthenticatorMapperModule; import org.apache.druid.server.log.NoopRequestLogger; import org.apache.druid.server.log.RequestLogger; import org.apache.druid.server.security.AuthorizerMapper; @@ -63,6 +65,7 @@ import org.apache.druid.sql.calcite.util.CalciteTests; import org.apache.druid.sql.calcite.view.DruidViewMacro; import org.apache.druid.sql.calcite.view.NoopViewManager; import org.apache.druid.sql.calcite.view.ViewManager; +import org.apache.druid.sql.http.SqlResourceTest; import org.easymock.EasyMock; import org.easymock.EasyMockRunner; import org.easymock.Mock; @@ -165,12 +168,16 @@ public class SqlModuleTest private Injector makeInjectorWithProperties(final Properties props) { + final SqlModule sqlModule = new SqlModule(); + sqlModule.setProps(props); + return Guice.createInjector( ImmutableList.of( new DruidGuiceExtensions(), new LifecycleModule(), new ServerModule(), new JacksonModule(), + new AuthenticatorMapperModule(), (Module) binder -> { binder.bind(Validator.class).toInstance(Validation.buildDefaultValidatorFactory().getValidator()); binder.bind(JsonConfigurator.class).in(LazySingleton.class); @@ -196,8 +203,9 @@ public class SqlModuleTest binder.bind(QueryScheduler.class) .toProvider(QuerySchedulerProvider.class) .in(LazySingleton.class); + binder.bind(ResponseContextConfig.class).toInstance(SqlResourceTest.TEST_RESPONSE_CONTEXT_CONFIG); }, - new SqlModule(props), + sqlModule, new TestViewManagerModule() ) ); diff --git a/sql/src/test/java/org/apache/druid/sql/http/SqlHttpModuleTest.java b/sql/src/test/java/org/apache/druid/sql/http/SqlHttpModuleTest.java index e1053989987..6916358cb5b 100644 --- a/sql/src/test/java/org/apache/druid/sql/http/SqlHttpModuleTest.java +++ b/sql/src/test/java/org/apache/druid/sql/http/SqlHttpModuleTest.java @@ -29,11 +29,13 @@ import org.apache.druid.guice.DruidGuiceExtensions; import org.apache.druid.guice.LifecycleModule; import org.apache.druid.guice.annotations.JSR311Resource; import org.apache.druid.guice.annotations.Json; +import org.apache.druid.guice.annotations.NativeQuery; +import org.apache.druid.guice.annotations.Self; +import org.apache.druid.server.DruidNode; +import org.apache.druid.server.ResponseContextConfig; import org.apache.druid.server.security.AuthorizerMapper; import org.apache.druid.sql.SqlStatementFactory; -import org.apache.druid.sql.SqlStatementFactoryFactory; import org.apache.druid.sql.calcite.run.NativeSqlEngine; -import org.easymock.Capture; import org.easymock.EasyMock; import org.easymock.EasyMockRunner; import org.easymock.Mock; @@ -50,8 +52,6 @@ public class SqlHttpModuleTest { @Mock private ObjectMapper jsonMpper; - @Mock - private SqlStatementFactoryFactory sqlStatementFactoryFactory; private SqlHttpModule target; private Injector injector; @@ -59,39 +59,40 @@ public class SqlHttpModuleTest @Before public void setUp() { - EasyMock.expect(sqlStatementFactoryFactory.factorize(EasyMock.capture(Capture.newInstance()))) - .andReturn(EasyMock.mock(SqlStatementFactory.class)) - .anyTimes(); - EasyMock.replay(sqlStatementFactoryFactory); - target = new SqlHttpModule(); injector = Guice.createInjector( new LifecycleModule(), new DruidGuiceExtensions(), binder -> { binder.bind(ObjectMapper.class).annotatedWith(Json.class).toInstance(jsonMpper); - binder.bind(SqlStatementFactoryFactory.class).toInstance(sqlStatementFactoryFactory); binder.bind(AuthorizerMapper.class).toInstance(new AuthorizerMapper(Collections.emptyMap())); binder.bind(NativeSqlEngine.class).toProvider(Providers.of(new NativeSqlEngine(null, null))); + binder.bind(DruidNode.class).annotatedWith(Self.class).toInstance(SqlResourceTest.DUMMY_DRUID_NODE); + binder.bind(ResponseContextConfig.class).toInstance(SqlResourceTest.TEST_RESPONSE_CONTEXT_CONFIG); + binder.bind(SqlStatementFactory.class) + .annotatedWith(NativeQuery.class) + .toInstance(EasyMock.mock(SqlStatementFactory.class)); }, target ); } @Test - public void testSqlResourceIsInjectedAndNotSingleton() + public void testSqlResourceIsInjectedAndSingleton() { SqlResource sqlResource = injector.getInstance(SqlResource.class); Assert.assertNotNull(sqlResource); SqlResource other = injector.getInstance(SqlResource.class); - Assert.assertNotSame(other, sqlResource); + Assert.assertSame(other, sqlResource); } @Test public void testSqlResourceIsAvailableViaJersey() { Set> jerseyResourceClasses = - injector.getInstance(Key.get(new TypeLiteral>>() {}, JSR311Resource.class)); + injector.getInstance(Key.get(new TypeLiteral>>() + { + }, JSR311Resource.class)); Assert.assertEquals(1, jerseyResourceClasses.size()); Assert.assertEquals(SqlResource.class, jerseyResourceClasses.iterator().next()); } diff --git a/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java b/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java index 4a16c344c12..90fadf92802 100644 --- a/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java +++ b/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java @@ -30,6 +30,7 @@ import com.google.common.collect.Maps; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import org.apache.calcite.avatica.SqlType; +import org.apache.commons.io.output.NullOutputStream; import org.apache.druid.common.config.NullHandling; import org.apache.druid.common.exception.AllowedRegexErrorResponseTransformStrategy; import org.apache.druid.common.exception.ErrorResponseTransformStrategy; @@ -57,9 +58,13 @@ import org.apache.druid.query.QueryRunnerFactoryConglomerate; import org.apache.druid.query.QueryTimeoutException; import org.apache.druid.query.QueryUnsupportedException; import org.apache.druid.query.ResourceLimitExceededException; +import org.apache.druid.query.context.ResponseContext; import org.apache.druid.query.groupby.GroupByQueryConfig; +import org.apache.druid.server.DruidNode; +import org.apache.druid.server.QueryResponse; import org.apache.druid.server.QueryScheduler; import org.apache.druid.server.QueryStackTests; +import org.apache.druid.server.ResponseContextConfig; import org.apache.druid.server.initialization.ServerConfig; import org.apache.druid.server.log.TestRequestLogger; import org.apache.druid.server.scheduling.HiLoQueryLaningStrategy; @@ -96,10 +101,8 @@ import org.easymock.EasyMock; import org.hamcrest.CoreMatchers; import org.hamcrest.MatcherAssert; import org.junit.After; -import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; -import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -110,7 +113,6 @@ import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; import javax.ws.rs.core.StreamingOutput; import java.io.ByteArrayOutputStream; -import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; @@ -130,11 +132,15 @@ import java.util.stream.Collectors; public class SqlResourceTest extends CalciteTestBase { + public static final DruidNode DUMMY_DRUID_NODE = new DruidNode("dummy", "dummy", false, 1, null, true, false); + public static final ResponseContextConfig TEST_RESPONSE_CONTEXT_CONFIG = ResponseContextConfig.newConfig(false); + private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper(); private static final String DUMMY_SQL_QUERY_ID = "dummy"; // Timeout to allow (rapid) debugging, while not blocking tests with errors. private static final int WAIT_TIMEOUT_SECS = 60; - private static final Consumer NULL_ACTION = s -> {}; + private static final Consumer NULL_ACTION = s -> { + }; private static final List EXPECTED_COLUMNS_FOR_RESULT_FORMAT_TESTS = Arrays.asList("__time", "dim1", "dim2", "dim3", "cnt", "m1", "m2", "unique_dim1", "EXPR$8"); @@ -166,26 +172,17 @@ public class SqlResourceTest extends CalciteTestBase private final SettableSupplier> planLatchSupplier = new SettableSupplier<>(); private final SettableSupplier> executeLatchSupplier = new SettableSupplier<>(); private final SettableSupplier, Sequence>> sequenceMapFnSupplier = new SettableSupplier<>(); + private final SettableSupplier responseContextSupplier = new SettableSupplier<>(); private Consumer onExecute = NULL_ACTION; private boolean sleep; - @BeforeClass - public static void setUpClass() - { - resourceCloser = Closer.create(); - conglomerate = QueryStackTests.createQueryRunnerFactoryConglomerate(resourceCloser); - } - - @AfterClass - public static void tearDownClass() throws IOException - { - resourceCloser.close(); - } - @Before public void setUp() throws Exception { + resourceCloser = Closer.create(); + conglomerate = QueryStackTests.createQueryRunnerFactoryConglomerate(resourceCloser); + final QueryScheduler scheduler = new QueryScheduler( 5, ManualQueryPrioritizationStrategy.INSTANCE, @@ -265,7 +262,7 @@ public class SqlResourceTest extends CalciteTestBase defaultQueryConfig, lifecycleManager ); - sqlStatementFactory = new SqlStatementFactory() + sqlStatementFactory = new SqlStatementFactory(null) { @Override public HttpStatement httpStatement( @@ -281,6 +278,7 @@ public class SqlResourceTest extends CalciteTestBase planLatchSupplier, executeLatchSupplier, sequenceMapFnSupplier, + responseContextSupplier, onExecute ); onExecute = NULL_ACTION; @@ -304,7 +302,9 @@ public class SqlResourceTest extends CalciteTestBase CalciteTests.TEST_AUTHORIZER_MAPPER, sqlStatementFactory, lifecycleManager, - new ServerConfig() + new ServerConfig(), + TEST_RESPONSE_CONTEXT_CONFIG, + DUMMY_DRUID_NODE ); } @@ -320,6 +320,7 @@ public class SqlResourceTest extends CalciteTestBase walker = null; executorService.shutdownNow(); executorService.awaitTermination(2, TimeUnit.SECONDS); + resourceCloser.close(); } @Test @@ -358,6 +359,55 @@ public class SqlResourceTest extends CalciteTestBase Assert.assertTrue(lifecycleManager.getAll("id").isEmpty()); } + @Test + public void testCountStarWithMissingIntervalsContext() throws Exception + { + final SqlQuery sqlQuery = new SqlQuery( + "SELECT COUNT(*) AS cnt, 'foo' AS TheFoo FROM druid.foo", + null, + false, + false, + false, + // We set uncoveredIntervalsLimit more for the funzies than anything. The underlying setup of the test doesn't + // actually look at it or operate with it. Instead, we set the supplier of the ResponseContext to mock what + // we would expect from the normal query pipeline + ImmutableMap.of(BaseQuery.SQL_QUERY_ID, "id", "uncoveredIntervalsLimit", 1), + null + ); + + final ResponseContext mockRespContext = ResponseContext.createEmpty(); + mockRespContext.put(ResponseContext.Keys.instance().keyOf("uncoveredIntervals"), "2030-01-01/78149827981274-01-01"); + mockRespContext.put(ResponseContext.Keys.instance().keyOf("uncoveredIntervalsOverflowed"), "true"); + responseContextSupplier.set(mockRespContext); + + final Response response = resource.doPost(sqlQuery, makeRegularUserReq()); + + Map responseContext = JSON_MAPPER.readValue( + (String) response.getMetadata().getFirst("X-Druid-Response-Context"), + Map.class + ); + Assert.assertEquals( + ImmutableMap.of( + "uncoveredIntervals", "2030-01-01/78149827981274-01-01", + "uncoveredIntervalsOverflowed", "true" + ), + responseContext + ); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ((StreamingOutput) response.getEntity()).write(baos); + Object results = JSON_MAPPER.readValue(baos.toByteArray(), Object.class); + + Assert.assertEquals( + ImmutableList.of( + ImmutableMap.of("cnt", 6, "TheFoo", "foo") + ), + results + ); + checkSqlRequestLog(true); + Assert.assertTrue(lifecycleManager.getAll("id").isEmpty()); + } + @Test public void testSqlLifecycleMetrics() throws Exception { @@ -594,7 +644,9 @@ public class SqlResourceTest extends CalciteTestBase ), doPost( new SqlQuery(query, ResultFormat.ARRAY, false, false, false, null, null), - new TypeReference>>() {} + new TypeReference>>() + { + } ).rhs ); } @@ -705,7 +757,9 @@ public class SqlResourceTest extends CalciteTestBase ), doPost( new SqlQuery(query, ResultFormat.ARRAY, true, true, true, null, null), - new TypeReference>>() {} + new TypeReference>>() + { + } ).rhs ); } @@ -729,7 +783,9 @@ public class SqlResourceTest extends CalciteTestBase ), doPost( new SqlQuery(query, ResultFormat.ARRAY, true, true, true, null, null), - new TypeReference>>() {} + new TypeReference>>() + { + } ).rhs ); } @@ -896,7 +952,9 @@ public class SqlResourceTest extends CalciteTestBase ).stream().map(transformer).collect(Collectors.toList()), doPost( new SqlQuery(query, ResultFormat.OBJECT, false, false, false, null, null), - new TypeReference>>() {} + new TypeReference>>() + { + } ).rhs ); } @@ -1102,10 +1160,10 @@ public class SqlResourceTest extends CalciteTestBase Assert.assertEquals(4, lines.size()); Assert.assertEquals(expectedHeader, JSON_MAPPER.readValue(lines.get(0), Object.class)); Assert.assertEquals( - ImmutableMap - .builder() - .put("EXPR$0", Arrays.asList(1, 2)) - .build(), + ImmutableMap + .builder() + .put("EXPR$0", Arrays.asList(1, 2)) + .build(), JSON_MAPPER.readValue(lines.get(1), Object.class) ); @@ -1338,8 +1396,8 @@ public class SqlResourceTest extends CalciteTestBase false, ImmutableMap.of(BaseQuery.SQL_QUERY_ID, "id"), null - ) - ).lhs; + ) + ).lhs; Assert.assertNotNull(exception); Assert.assertEquals(QueryUnsupportedException.ERROR_CODE, exception.getErrorCode()); @@ -1362,8 +1420,9 @@ public class SqlResourceTest extends CalciteTestBase false, ImmutableMap.of("sqlQueryId", queryId), null - ), - req); + ), + req + ); Assert.assertNotEquals(200, response.getStatus()); final MultivaluedMap headers = response.getMetadata(); Assert.assertTrue(headers.containsKey(SqlResource.SQL_QUERY_ID_RESPONSE_HEADER)); @@ -1385,8 +1444,9 @@ public class SqlResourceTest extends CalciteTestBase false, ImmutableMap.of(), null - ), - req); + ), + req + ); Assert.assertNotEquals(200, response.getStatus()); final MultivaluedMap headers = response.getMetadata(); Assert.assertTrue(headers.containsKey(SqlResource.SQL_QUERY_ID_RESPONSE_HEADER)); @@ -1402,7 +1462,8 @@ public class SqlResourceTest extends CalciteTestBase CalciteTests.TEST_AUTHORIZER_MAPPER, sqlStatementFactory, lifecycleManager, - new ServerConfig() { + new ServerConfig() + { @Override public boolean isShowDetailedJettyErrors() { @@ -1414,7 +1475,9 @@ public class SqlResourceTest extends CalciteTestBase { return new AllowedRegexErrorResponseTransformStrategy(ImmutableList.of()); } - } + }, + TEST_RESPONSE_CONTEXT_CONFIG, + DUMMY_DRUID_NODE ); String errorMessage = "This will be supported in Druid 9999"; @@ -1428,8 +1491,8 @@ public class SqlResourceTest extends CalciteTestBase false, ImmutableMap.of("sqlQueryId", "id"), null - ) - ).lhs; + ) + ).lhs; Assert.assertNotNull(exception); Assert.assertNull(exception.getMessage()); @@ -1460,7 +1523,9 @@ public class SqlResourceTest extends CalciteTestBase { return new AllowedRegexErrorResponseTransformStrategy(ImmutableList.of()); } - } + }, + TEST_RESPONSE_CONTEXT_CONFIG, + DUMMY_DRUID_NODE ); String errorMessage = "could not assert"; @@ -1477,8 +1542,8 @@ public class SqlResourceTest extends CalciteTestBase false, ImmutableMap.of("sqlQueryId", "id"), null - ) - ).lhs; + ) + ).lhs; Assert.assertNotNull(exception); Assert.assertNull(exception.getMessage()); @@ -1653,6 +1718,24 @@ public class SqlResourceTest extends CalciteTestBase execLatch.countDown(); response = future.get(); + // The response that we get is the actual object created by the SqlResource. The StreamingOutput object that + // the SqlResource returns at the time of writing has resources opened up (the query is already running) which + // need to be closed. As such, the StreamingOutput needs to actually be called in order to cause that close + // to occur, so we must get the entity out and call `.write(OutputStream)` on it to invoke the code. + try { + ((StreamingOutput) response.getEntity()).write(NullOutputStream.NULL_OUTPUT_STREAM); + } + catch (IllegalStateException e) { + // When we actually attempt to write to the output stream, we seem to run into multi-threading issues likely + // with our test setup. Instead of figuring out how to make the thing work, given that we don't actually + // care about the response, we are going to just ensure that it was the expected exception and ignore it. + // It's possible that this test starts failing suddenly if someone changes the message of the exception, it + // should be safe to just update the expected message here too if that happens. + Assert.assertEquals( + "DefaultQueryMetrics must not be modified from multiple threads. If it is needed to gather dimension or metric information from multiple threads or from an async thread, this information should explicitly be passed between threads (e. g. using Futures), or this DefaultQueryMetrics's ownerThread should be reassigned explicitly", + e.getMessage() + ); + } Assert.assertEquals(Status.OK.getStatusCode(), response.getStatus()); } @@ -1906,6 +1989,7 @@ public class SqlResourceTest extends CalciteTestBase private final SettableSupplier> planLatchSupplier; private final SettableSupplier> executeLatchSupplier; private final SettableSupplier, Sequence>> sequenceMapFnSupplier; + private final SettableSupplier responseContextSupplier; private final Consumer onExecute; private TestHttpStatement( @@ -1916,6 +2000,7 @@ public class SqlResourceTest extends CalciteTestBase SettableSupplier> planLatchSupplier, SettableSupplier> executeLatchSupplier, SettableSupplier, Sequence>> sequenceMapFnSupplier, + SettableSupplier responseContextSupplier, final Consumer onAuthorize ) { @@ -1924,13 +2009,15 @@ public class SqlResourceTest extends CalciteTestBase this.planLatchSupplier = planLatchSupplier; this.executeLatchSupplier = executeLatchSupplier; this.sequenceMapFnSupplier = sequenceMapFnSupplier; + this.responseContextSupplier = responseContextSupplier; this.onExecute = onAuthorize; } @Override protected void authorize( DruidPlanner planner, - Function, Access> authorizer) + Function, Access> authorizer + ) { if (validateAndAuthorizeLatchSupplier.get() != null) { if (validateAndAuthorizeLatchSupplier.get().rhs) { @@ -1955,14 +2042,15 @@ public class SqlResourceTest extends CalciteTestBase @Override public PlannerResult createPlan(DruidPlanner planner) { - if (planLatchSupplier.get() != null) { - if (planLatchSupplier.get().rhs) { + final NonnullPair planLatch = planLatchSupplier.get(); + if (planLatch != null) { + if (planLatch.rhs) { PlannerResult result = super.createPlan(planner); - planLatchSupplier.get().lhs.countDown(); + planLatch.lhs.countDown(); return result; } else { try { - if (!planLatchSupplier.get().lhs.await(WAIT_TIMEOUT_SECS, TimeUnit.SECONDS)) { + if (!planLatch.lhs.await(WAIT_TIMEOUT_SECS, TimeUnit.SECONDS)) { throw new RuntimeException("Latch timed out"); } } @@ -1989,30 +2077,37 @@ public class SqlResourceTest extends CalciteTestBase return new ResultSet(plannerResult) { @Override - public Sequence run() + public QueryResponse run() { final Function, Sequence> sequenceMapFn = Optional.ofNullable(sequenceMapFnSupplier.get()).orElse(Function.identity()); - if (executeLatchSupplier.get() != null) { - if (executeLatchSupplier.get().rhs) { - Sequence sequence = sequenceMapFn.apply(super.run()); - executeLatchSupplier.get().lhs.countDown(); - return sequence; + final NonnullPair executeLatch = executeLatchSupplier.get(); + if (executeLatch != null) { + if (executeLatch.rhs) { + final QueryResponse resp = super.run(); + Sequence sequence = sequenceMapFn.apply(resp.getResults()); + executeLatch.lhs.countDown(); + final ResponseContext respContext = resp.getResponseContext(); + respContext.merge(responseContextSupplier.get()); + return new QueryResponse(sequence, respContext); } else { try { - if (!executeLatchSupplier.get().lhs.await(WAIT_TIMEOUT_SECS, TimeUnit.SECONDS)) { + if (!executeLatch.lhs.await(WAIT_TIMEOUT_SECS, TimeUnit.SECONDS)) { throw new RuntimeException("Latch timed out"); } } catch (InterruptedException e) { throw new RuntimeException(e); } - return sequenceMapFn.apply(super.run()); } - } else { - return sequenceMapFn.apply(super.run()); } + + final QueryResponse resp = super.run(); + Sequence sequence = sequenceMapFn.apply(resp.getResults()); + final ResponseContext respContext = resp.getResponseContext(); + respContext.merge(responseContextSupplier.get()); + return new QueryResponse(sequence, respContext); } }; }