Add ability to run MSQ in Quidem tests (#16798)

* implements some jdbc facade to enable msq usage
* adds an !msqPlan command
* adds more guice usage to testsystem startup
This commit is contained in:
Zoltan Haindrich 2024-08-08 06:37:06 +02:00 committed by GitHub
parent 1cf3f4bebe
commit 408702e100
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
35 changed files with 977 additions and 383 deletions

View File

@ -213,6 +213,37 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>net.hydromatic</groupId>
<artifactId>quidem</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.calcite.avatica</groupId>
<artifactId>avatica-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-testkit</artifactId>
<version>${calcite.version}</version>
<exclusions>
<exclusion>
<groupId>org.junit.jupiter</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.hamcrest</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.reflections</groupId>
<artifactId>reflections</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
@ -233,11 +264,6 @@
<artifactId>junit-jupiter-params</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.reflections</groupId>
<artifactId>reflections</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>

View File

@ -26,6 +26,7 @@ import com.google.common.base.Preconditions;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.druid.common.config.Configs;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import javax.annotation.Nullable;
import java.util.List;
@ -34,7 +35,7 @@ import java.util.Objects;
public class MSQResultsReport
{
/**
* Like {@link org.apache.druid.segment.column.RowSignature}, but allows duplicate column names for compatibility
* Like {@link RowSignature}, but allows duplicate column names for compatibility
* with SQL (which also allows duplicate column names in query results).
*/
private final List<ColumnAndType> signature;
@ -135,5 +136,18 @@ public class MSQResultsReport
{
return name + ":" + type;
}
public static RowSignature toRowSignature(List<ColumnAndType> columnAndTypes)
{
final RowSignature.Builder builder = RowSignature.builder();
for (MSQResultsReport.ColumnAndType columnAndType : columnAndTypes) {
builder.add(columnAndType.getName(), columnAndType.getType());
}
RowSignature rowSignature = builder.build();
if (rowSignature.size() != columnAndTypes.size()) {
throw new IllegalArgumentException("Duplicate column names are not allowed in RowSignature");
}
return rowSignature;
}
}
}

View File

@ -20,7 +20,6 @@
package org.apache.druid.msq.exec;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.inject.Injector;
import com.google.inject.Module;
import org.apache.druid.guice.DruidInjectorBuilder;
@ -28,9 +27,7 @@ import org.apache.druid.msq.exec.MSQDrillWindowQueryTest.DrillWindowQueryMSQComp
import org.apache.druid.msq.sql.MSQTaskSqlEngine;
import org.apache.druid.msq.test.CalciteMSQTestsHelper;
import org.apache.druid.msq.test.ExtractResultsFactory;
import org.apache.druid.msq.test.MSQTestBase;
import org.apache.druid.msq.test.MSQTestOverlordServiceClient;
import org.apache.druid.msq.test.MSQTestTaskActionClient;
import org.apache.druid.msq.test.VerifyMSQSupportedNativeQueriesPredicate;
import org.apache.druid.query.groupby.TestGroupByBuffers;
import org.apache.druid.server.QueryLifecycleFactory;
@ -55,6 +52,7 @@ public class MSQDrillWindowQueryTest extends DrillWindowQueryTest
{
super.configureGuice(builder);
builder.addModules(CalciteMSQTestsHelper.fetchModules(tempDirProducer::newTempFolder, TestGroupByBuffers.createDefault()).toArray(new Module[0]));
builder.addModule(new TestMSQSqlModule());
}
@Override
@ -64,15 +62,7 @@ public class MSQDrillWindowQueryTest extends DrillWindowQueryTest
Injector injector
)
{
final WorkerMemoryParameters workerMemoryParameters = MSQTestBase.makeTestWorkerMemoryParameters();
final MSQTestOverlordServiceClient indexingServiceClient = new MSQTestOverlordServiceClient(
queryJsonMapper,
injector,
new MSQTestTaskActionClient(queryJsonMapper, injector),
workerMemoryParameters,
ImmutableList.of()
);
return new MSQTaskSqlEngine(indexingServiceClient, queryJsonMapper);
return injector.getInstance(MSQTaskSqlEngine.class);
}
}

View File

@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.msq.exec;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.inject.Injector;
import com.google.inject.Provides;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.initialization.ServerInjectorBuilderTest.TestDruidModule;
import org.apache.druid.msq.guice.MultiStageQuery;
import org.apache.druid.msq.sql.MSQTaskSqlEngine;
import org.apache.druid.msq.test.MSQTestBase;
import org.apache.druid.msq.test.MSQTestOverlordServiceClient;
import org.apache.druid.msq.test.MSQTestTaskActionClient;
import org.apache.druid.sql.SqlStatementFactory;
import org.apache.druid.sql.SqlToolbox;
import org.apache.druid.sql.avatica.DruidMeta;
import org.apache.druid.sql.avatica.MSQDruidMeta;
public class TestMSQSqlModule extends TestDruidModule
{
@Provides
@MultiStageQuery
@LazySingleton
public SqlStatementFactory makeMSQSqlStatementFactory(
final MSQTaskSqlEngine sqlEngine,
SqlToolbox toolbox)
{
return new SqlStatementFactory(toolbox.withEngine(sqlEngine));
}
@Provides
@LazySingleton
public MSQTaskSqlEngine createEngine(
ObjectMapper queryJsonMapper,
MSQTestOverlordServiceClient indexingServiceClient)
{
return new MSQTaskSqlEngine(indexingServiceClient, queryJsonMapper);
}
@Provides
@LazySingleton
private MSQTestOverlordServiceClient makeOverlordServiceClient(
ObjectMapper queryJsonMapper,
Injector injector,
WorkerMemoryParameters workerMemoryParameters)
{
final MSQTestOverlordServiceClient indexingServiceClient = new MSQTestOverlordServiceClient(
queryJsonMapper,
injector,
new MSQTestTaskActionClient(queryJsonMapper, injector),
workerMemoryParameters,
ImmutableList.of()
);
return indexingServiceClient;
}
@Provides
@LazySingleton
private WorkerMemoryParameters makeWorkerMemoryParameters()
{
return MSQTestBase.makeTestWorkerMemoryParameters();
}
@Provides
@LazySingleton
public DruidMeta createMeta(MSQDruidMeta druidMeta)
{
return druidMeta;
}
}

View File

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.msq.quidem;
import org.apache.druid.quidem.DruidQuidemTestBase;
import org.apache.druid.quidem.ProjectPathUtils;
import java.io.File;
public class MSQQuidemTest extends DruidQuidemTestBase
{
public MSQQuidemTest()
{
super();
}
@Override
protected File getTestRoot()
{
return ProjectPathUtils.getPathFromProjectRoot("extensions-core/multi-stage-query/src/test/quidem/" + getClass().getName());
}
}

View File

@ -19,63 +19,17 @@
package org.apache.druid.msq.test;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.inject.Injector;
import com.google.inject.Module;
import org.apache.druid.guice.DruidInjectorBuilder;
import org.apache.druid.msq.exec.WorkerMemoryParameters;
import org.apache.druid.msq.sql.MSQTaskSqlEngine;
import org.apache.druid.msq.test.CalciteArraysQueryMSQTest.ArraysQueryMSQComponentSupplier;
import org.apache.druid.query.groupby.TestGroupByBuffers;
import org.apache.druid.server.QueryLifecycleFactory;
import org.apache.druid.sql.calcite.CalciteArraysQueryTest;
import org.apache.druid.sql.calcite.QueryTestBuilder;
import org.apache.druid.sql.calcite.SqlTestFrameworkConfig;
import org.apache.druid.sql.calcite.TempDirProducer;
import org.apache.druid.sql.calcite.run.SqlEngine;
/**
* Runs {@link CalciteArraysQueryTest} but with MSQ engine
*/
@SqlTestFrameworkConfig.ComponentSupplier(ArraysQueryMSQComponentSupplier.class)
@SqlTestFrameworkConfig.ComponentSupplier(StandardMSQComponentSupplier.class)
public class CalciteArraysQueryMSQTest extends CalciteArraysQueryTest
{
public static class ArraysQueryMSQComponentSupplier extends ArraysComponentSupplier
{
public ArraysQueryMSQComponentSupplier(TempDirProducer tempFolderProducer)
{
super(tempFolderProducer);
}
@Override
public void configureGuice(DruidInjectorBuilder builder)
{
super.configureGuice(builder);
builder.addModules(
CalciteMSQTestsHelper.fetchModules(tempDirProducer::newTempFolder, TestGroupByBuffers.createDefault()).toArray(new Module[0])
);
}
@Override
public SqlEngine createEngine(
QueryLifecycleFactory qlf,
ObjectMapper queryJsonMapper,
Injector injector
)
{
final WorkerMemoryParameters workerMemoryParameters = MSQTestBase.makeTestWorkerMemoryParameters();
final MSQTestOverlordServiceClient indexingServiceClient = new MSQTestOverlordServiceClient(
queryJsonMapper,
injector,
new MSQTestTaskActionClient(queryJsonMapper, injector),
workerMemoryParameters,
ImmutableList.of()
);
return new MSQTaskSqlEngine(indexingServiceClient, queryJsonMapper);
}
}
@Override
protected QueryTestBuilder testBuilder()
{

View File

@ -25,7 +25,7 @@ import com.google.inject.Injector;
import com.google.inject.Module;
import org.apache.druid.guice.DruidInjectorBuilder;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.msq.exec.WorkerMemoryParameters;
import org.apache.druid.msq.exec.TestMSQSqlModule;
import org.apache.druid.msq.sql.MSQTaskSqlEngine;
import org.apache.druid.query.groupby.TestGroupByBuffers;
import org.apache.druid.server.QueryLifecycleFactory;
@ -58,6 +58,7 @@ public class CalciteNestedDataQueryMSQTest extends CalciteNestedDataQueryTest
builder.addModules(
CalciteMSQTestsHelper.fetchModules(tempDirProducer::newTempFolder, TestGroupByBuffers.createDefault()).toArray(new Module[0])
);
builder.addModule(new TestMSQSqlModule());
}
@Override
@ -67,15 +68,7 @@ public class CalciteNestedDataQueryMSQTest extends CalciteNestedDataQueryTest
Injector injector
)
{
final WorkerMemoryParameters workerMemoryParameters = MSQTestBase.makeTestWorkerMemoryParameters();
final MSQTestOverlordServiceClient indexingServiceClient = new MSQTestOverlordServiceClient(
queryJsonMapper,
injector,
new MSQTestTaskActionClient(queryJsonMapper, injector),
workerMemoryParameters,
ImmutableList.of()
);
return new MSQTaskSqlEngine(indexingServiceClient, queryJsonMapper);
return injector.getInstance(MSQTaskSqlEngine.class);
}
}

View File

@ -19,26 +19,15 @@
package org.apache.druid.msq.test;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.inject.Injector;
import com.google.inject.Module;
import org.apache.calcite.rel.RelRoot;
import org.apache.druid.guice.DruidInjectorBuilder;
import org.apache.druid.msq.exec.WorkerMemoryParameters;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.msq.sql.MSQTaskSqlEngine;
import org.apache.druid.query.groupby.TestGroupByBuffers;
import org.apache.druid.server.QueryLifecycleFactory;
import org.apache.druid.sql.calcite.BaseCalciteQueryTest;
import org.apache.druid.sql.calcite.CalciteJoinQueryTest;
import org.apache.druid.sql.calcite.QueryTestBuilder;
import org.apache.druid.sql.calcite.SqlTestFrameworkConfig;
import org.apache.druid.sql.calcite.TempDirProducer;
import org.apache.druid.sql.calcite.planner.JoinAlgorithm;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.run.EngineFeature;
import org.apache.druid.sql.calcite.run.QueryMaker;
import org.apache.druid.sql.calcite.run.SqlEngine;
import org.apache.druid.sql.calcite.util.SqlTestFramework.StandardComponentSupplier;
import java.util.Map;
/**
* Runs {@link CalciteJoinQueryTest} but with MSQ engine.
@ -48,7 +37,6 @@ public class CalciteSelectJoinQueryMSQTest
/**
* Run all tests with {@link JoinAlgorithm#BROADCAST}.
*/
@SqlTestFrameworkConfig.ComponentSupplier(BroadcastJoinComponentSupplier.class)
public static class BroadcastTest extends Base
{
@Override
@ -57,12 +45,17 @@ public class CalciteSelectJoinQueryMSQTest
return super.testBuilder()
.verifyNativeQueries(new VerifyMSQSupportedNativeQueriesPredicate());
}
@Override
protected JoinAlgorithm joinAlgorithm()
{
return JoinAlgorithm.BROADCAST;
}
}
/**
* Run all tests with {@link JoinAlgorithm#SORT_MERGE}.
*/
@SqlTestFrameworkConfig.ComponentSupplier(SortMergeJoinComponentSupplier.class)
public static class SortMergeTest extends Base
{
@Override
@ -79,86 +72,31 @@ public class CalciteSelectJoinQueryMSQTest
return super.testBuilder()
.verifyNativeQueries(xs -> false);
}
@Override
protected JoinAlgorithm joinAlgorithm()
{
return JoinAlgorithm.SORT_MERGE;
}
}
@SqlTestFrameworkConfig.ComponentSupplier(StandardMSQComponentSupplier.class)
public abstract static class Base extends CalciteJoinQueryTest
{
protected abstract JoinAlgorithm joinAlgorithm();
@Override
protected QueryTestBuilder testBuilder()
{
return new QueryTestBuilder(new CalciteTestConfig(true))
Map<String, Object> defaultCtx = ImmutableMap.<String, Object>builder()
.putAll(BaseCalciteQueryTest.QUERY_CONTEXT_DEFAULT)
.put(PlannerContext.CTX_SQL_JOIN_ALGORITHM, joinAlgorithm().toString())
.build();
return new QueryTestBuilder(new CalciteTestConfig(defaultCtx, true))
.addCustomRunner(
new ExtractResultsFactory(
() -> (MSQTestOverlordServiceClient) ((MSQTaskSqlEngine) queryFramework().engine()).overlordClient()))
.skipVectorize(true);
}
}
protected static class SortMergeJoinComponentSupplier extends AbstractJoinComponentSupplier
{
public SortMergeJoinComponentSupplier(TempDirProducer tempFolderProducer)
{
super(tempFolderProducer, JoinAlgorithm.SORT_MERGE);
}
}
protected static class BroadcastJoinComponentSupplier extends AbstractJoinComponentSupplier
{
public BroadcastJoinComponentSupplier(TempDirProducer tempFolderProducer)
{
super(tempFolderProducer, JoinAlgorithm.BROADCAST);
}
}
protected abstract static class AbstractJoinComponentSupplier extends StandardComponentSupplier
{
private JoinAlgorithm joinAlgorithm;
public AbstractJoinComponentSupplier(TempDirProducer tempFolderProducer, JoinAlgorithm joinAlgorithm)
{
super(tempFolderProducer);
this.joinAlgorithm = joinAlgorithm;
}
@Override
public void configureGuice(DruidInjectorBuilder builder)
{
super.configureGuice(builder);
builder.addModules(
CalciteMSQTestsHelper.fetchModules(tempDirProducer::newTempFolder, TestGroupByBuffers.createDefault()).toArray(new Module[0])
);
}
@Override
public SqlEngine createEngine(
QueryLifecycleFactory qlf,
ObjectMapper queryJsonMapper,
Injector injector
)
{
final WorkerMemoryParameters workerMemoryParameters = MSQTestBase.makeTestWorkerMemoryParameters();
final MSQTestOverlordServiceClient indexingServiceClient = new MSQTestOverlordServiceClient(
queryJsonMapper,
injector,
new MSQTestTaskActionClient(queryJsonMapper, injector),
workerMemoryParameters,
ImmutableList.of()
);
return new MSQTaskSqlEngine(indexingServiceClient, queryJsonMapper)
{
@Override
public boolean featureAvailable(EngineFeature feature)
{
return super.featureAvailable(feature);
}
@Override
public QueryMaker buildQueryMakerForSelect(RelRoot relRoot, PlannerContext plannerContext)
{
plannerContext.queryContextMap().put(PlannerContext.CTX_SQL_JOIN_ALGORITHM, joinAlgorithm.toString());
return super.buildQueryMakerForSelect(relRoot, plannerContext);
}
};
}
}
}

View File

@ -19,25 +19,14 @@
package org.apache.druid.msq.test;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Injector;
import com.google.inject.Module;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.guice.DruidInjectorBuilder;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.msq.exec.WorkerMemoryParameters;
import org.apache.druid.msq.sql.MSQTaskSqlEngine;
import org.apache.druid.msq.test.CalciteSelectQueryMSQTest.SelectMSQComponentSupplier;
import org.apache.druid.query.groupby.TestGroupByBuffers;
import org.apache.druid.server.QueryLifecycleFactory;
import org.apache.druid.sql.calcite.CalciteQueryTest;
import org.apache.druid.sql.calcite.QueryTestBuilder;
import org.apache.druid.sql.calcite.SqlTestFrameworkConfig;
import org.apache.druid.sql.calcite.TempDirProducer;
import org.apache.druid.sql.calcite.run.SqlEngine;
import org.apache.druid.sql.calcite.util.SqlTestFramework.StandardComponentSupplier;
import org.junit.Assert;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
@ -48,43 +37,9 @@ import java.util.concurrent.TimeUnit;
/**
* Runs {@link CalciteQueryTest} but with MSQ engine
*/
@SqlTestFrameworkConfig.ComponentSupplier(SelectMSQComponentSupplier.class)
@SqlTestFrameworkConfig.ComponentSupplier(StandardMSQComponentSupplier.class)
public class CalciteSelectQueryMSQTest extends CalciteQueryTest
{
public static class SelectMSQComponentSupplier extends StandardComponentSupplier
{
public SelectMSQComponentSupplier(TempDirProducer tempFolderProducer)
{
super(tempFolderProducer);
}
@Override
public void configureGuice(DruidInjectorBuilder builder)
{
super.configureGuice(builder);
builder.addModules(CalciteMSQTestsHelper.fetchModules(tempDirProducer::newTempFolder, TestGroupByBuffers.createDefault()).toArray(new Module[0]));
}
@Override
public SqlEngine createEngine(
QueryLifecycleFactory qlf,
ObjectMapper queryJsonMapper,
Injector injector
)
{
final WorkerMemoryParameters workerMemoryParameters = MSQTestBase.makeTestWorkerMemoryParameters();
final MSQTestOverlordServiceClient indexingServiceClient = new MSQTestOverlordServiceClient(
queryJsonMapper,
injector,
new MSQTestTaskActionClient(queryJsonMapper, injector),
workerMemoryParameters,
ImmutableList.of()
);
return new MSQTaskSqlEngine(indexingServiceClient, queryJsonMapper);
}
}
@Override
protected QueryTestBuilder testBuilder()
{

View File

@ -19,14 +19,9 @@
package org.apache.druid.msq.test;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.inject.Injector;
import com.google.inject.Module;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.guice.DruidInjectorBuilder;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.msq.exec.WorkerMemoryParameters;
import org.apache.druid.msq.sql.MSQTaskSqlEngine;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.TableDataSource;
@ -35,62 +30,21 @@ import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.TestGroupByBuffers;
import org.apache.druid.server.QueryLifecycleFactory;
import org.apache.druid.sql.calcite.BaseCalciteQueryTest;
import org.apache.druid.sql.calcite.CalciteUnionQueryTest;
import org.apache.druid.sql.calcite.QueryTestBuilder;
import org.apache.druid.sql.calcite.SqlTestFrameworkConfig;
import org.apache.druid.sql.calcite.TempDirProducer;
import org.apache.druid.sql.calcite.filtration.Filtration;
import org.apache.druid.sql.calcite.run.SqlEngine;
import org.apache.druid.sql.calcite.util.CalciteTests;
import org.apache.druid.sql.calcite.util.SqlTestFramework.StandardComponentSupplier;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
/**
* Runs {@link CalciteUnionQueryTest} but with MSQ engine
*/
@SqlTestFrameworkConfig.ComponentSupplier(CalciteUnionQueryMSQTest.UnionQueryMSQComponentSupplier.class)
@SqlTestFrameworkConfig.ComponentSupplier(StandardMSQComponentSupplier.class)
public class CalciteUnionQueryMSQTest extends CalciteUnionQueryTest
{
public static class UnionQueryMSQComponentSupplier extends StandardComponentSupplier
{
public UnionQueryMSQComponentSupplier(TempDirProducer tempFolderProducer)
{
super(tempFolderProducer);
}
@Override
public void configureGuice(DruidInjectorBuilder builder)
{
super.configureGuice(builder);
builder.addModules(
CalciteMSQTestsHelper.fetchModules(tempDirProducer::newTempFolder, TestGroupByBuffers.createDefault()).toArray(new Module[0])
);
}
@Override
public SqlEngine createEngine(
QueryLifecycleFactory qlf,
ObjectMapper queryJsonMapper,
Injector injector
)
{
final WorkerMemoryParameters workerMemoryParameters = MSQTestBase.makeTestWorkerMemoryParameters();
final MSQTestOverlordServiceClient indexingServiceClient = new MSQTestOverlordServiceClient(
queryJsonMapper,
injector,
new MSQTestTaskActionClient(queryJsonMapper, injector),
workerMemoryParameters,
ImmutableList.of()
);
return new MSQTaskSqlEngine(indexingServiceClient, queryJsonMapper);
}
}
@Override
protected QueryTestBuilder testBuilder()
{

View File

@ -20,7 +20,7 @@
package org.apache.druid.msq.test;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.msq.indexing.report.MSQResultsReport;
import org.apache.druid.msq.indexing.report.MSQResultsReport.ColumnAndType;
import org.apache.druid.msq.indexing.report.MSQTaskReport;
import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
import org.apache.druid.segment.column.RowSignature;
@ -104,7 +104,7 @@ public class ExtractResultsFactory implements QueryTestRunner.QueryRunStepFactor
}
extractedResults.add(
results.withSignatureAndResults(
convertColumnAndTypeToRowSignature(payload.getResults().getSignature()), resultRows)
ColumnAndType.toRowSignature(payload.getResults().getSignature()), resultRows)
);
}
}
@ -114,15 +114,6 @@ public class ExtractResultsFactory implements QueryTestRunner.QueryRunStepFactor
{
return extractedResults;
}
private RowSignature convertColumnAndTypeToRowSignature(final List<MSQResultsReport.ColumnAndType> columnAndTypes)
{
final RowSignature.Builder builder = RowSignature.builder();
for (MSQResultsReport.ColumnAndType columnAndType : columnAndTypes) {
builder.add(columnAndType.getName(), columnAndType.getType());
}
return builder.build();
}
};
}
}

View File

@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.msq.test;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Injector;
import com.google.inject.Module;
import org.apache.druid.guice.DruidInjectorBuilder;
import org.apache.druid.msq.exec.TestMSQSqlModule;
import org.apache.druid.msq.sql.MSQTaskSqlEngine;
import org.apache.druid.query.groupby.TestGroupByBuffers;
import org.apache.druid.server.QueryLifecycleFactory;
import org.apache.druid.sql.calcite.TempDirProducer;
import org.apache.druid.sql.calcite.run.SqlEngine;
import org.apache.druid.sql.calcite.util.SqlTestFramework.StandardComponentSupplier;
public final class StandardMSQComponentSupplier extends StandardComponentSupplier
{
public StandardMSQComponentSupplier(TempDirProducer tempFolderProducer)
{
super(tempFolderProducer);
}
@Override
public void configureGuice(DruidInjectorBuilder builder)
{
super.configureGuice(builder);
builder.addModules(
CalciteMSQTestsHelper.fetchModules(tempDirProducer::newTempFolder, TestGroupByBuffers.createDefault())
.toArray(new Module[0])
);
builder.addModule(new TestMSQSqlModule());
}
@Override
public SqlEngine createEngine(
QueryLifecycleFactory qlf,
ObjectMapper queryJsonMapper,
Injector injector)
{
return injector.getInstance(MSQTaskSqlEngine.class);
}
}

View File

@ -0,0 +1,163 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.avatica;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import org.apache.calcite.avatica.Meta;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.commons.lang3.RegExUtils;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.msq.guice.MultiStageQuery;
import org.apache.druid.msq.indexing.report.MSQResultsReport.ColumnAndType;
import org.apache.druid.msq.indexing.report.MSQTaskReport;
import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
import org.apache.druid.msq.test.MSQTestBase;
import org.apache.druid.msq.test.MSQTestOverlordServiceClient;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.server.security.AuthenticatorMapper;
import org.apache.druid.sql.SqlStatementFactory;
import org.apache.druid.sql.calcite.planner.DruidTypeSystem;
import org.apache.druid.sql.calcite.table.RowSignatures;
import org.apache.druid.sql.hook.DruidHook;
import org.apache.druid.sql.hook.DruidHookDispatcher;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.regex.Pattern;
public class MSQDruidMeta extends DruidMeta
{
protected final MSQTestOverlordServiceClient overlordClient;
protected final ObjectMapper objectMapper;
protected final DruidHookDispatcher hookDispatcher;
@Inject
public MSQDruidMeta(
final @MultiStageQuery SqlStatementFactory sqlStatementFactory,
final AvaticaServerConfig config,
final ErrorHandler errorHandler,
final AuthenticatorMapper authMapper,
final MSQTestOverlordServiceClient overlordClient,
final ObjectMapper objectMapper,
final DruidHookDispatcher hookDispatcher)
{
super(sqlStatementFactory, config, errorHandler, authMapper);
this.overlordClient = overlordClient;
this.objectMapper = objectMapper;
this.hookDispatcher = hookDispatcher;
}
@Override
protected ExecuteResult doFetch(AbstractDruidJdbcStatement druidStatement, int maxRows)
{
String taskId = extractTaskId(druidStatement);
MSQTaskReportPayload payload = (MSQTaskReportPayload) overlordClient.getReportForTask(taskId)
.get(MSQTaskReport.REPORT_KEY)
.getPayload();
if (payload.getStatus().getStatus().isFailure()) {
throw new ISE(
"Query task [%s] failed due to %s",
taskId,
payload.getStatus().getErrorReport().toString()
);
}
if (!payload.getStatus().getStatus().isComplete()) {
throw new ISE("Query task [%s] should have finished", taskId);
}
final List<?> resultRows = MSQTestBase.getRows(payload.getResults());
if (resultRows == null) {
throw new ISE("Results report not present in the task's report payload");
}
try {
String str = objectMapper
.writerWithDefaultPrettyPrinter()
.writeValueAsString(payload.getStages());
str = maskMSQPlan(str, taskId);
hookDispatcher.dispatch(DruidHook.MSQ_PLAN, str);
}
catch (JsonProcessingException e) {
hookDispatcher.dispatch(DruidHook.MSQ_PLAN, "error happened during json serialization");
}
Signature signature = makeSignature(druidStatement, payload.getResults().getSignature());
@SuppressWarnings("unchecked")
Frame firstFrame = Frame.create(0, true, (List<Object>) resultRows);
return new ExecuteResult(
ImmutableList.of(
MetaResultSet.create(
druidStatement.connectionId,
druidStatement.statementId,
false,
signature,
firstFrame
)
)
);
}
private static final Map<Pattern, String> REPLACEMENT_MAP = ImmutableMap.<Pattern, String>builder()
.put(Pattern.compile("\"startTime\" : .*"), "\"startTime\" : __TIMESTAMP__")
.put(Pattern.compile("\"duration\" : .*"), "\"duration\" : __DURATION__")
.put(Pattern.compile("\"sqlQueryId\" : .*"), "\"sqlQueryId\" : __SQL_QUERY_ID__")
.build();
private String maskMSQPlan(String str, String taskId)
{
Pattern taskIdPattern = Pattern.compile(Pattern.quote(taskId));
str = RegExUtils.replaceAll(str, taskIdPattern, "<taskId>");
for (Entry<Pattern, String> entry : REPLACEMENT_MAP.entrySet()) {
str = RegExUtils.replaceAll(str, entry.getKey(), entry.getValue());
}
return str;
}
private Signature makeSignature(AbstractDruidJdbcStatement druidStatement, List<ColumnAndType> cat)
{
RowSignature sig = ColumnAndType.toRowSignature(cat);
RelDataType rowType = RowSignatures.toRelDataType(sig, DruidTypeSystem.TYPE_FACTORY);
return Meta.Signature.create(
AbstractDruidJdbcStatement.createColumnMetaData(rowType),
druidStatement.getSqlQuery().sql(),
Collections.emptyList(),
Meta.CursorFactory.ARRAY,
Meta.StatementType.SELECT
);
}
private String extractTaskId(AbstractDruidJdbcStatement druidStatement)
{
ExecuteResult r = super.doFetch(druidStatement, 2);
Object[] row = (Object[]) r.resultSets.get(0).firstFrame.rows.iterator().next();
return (String) row[0];
}
}

View File

@ -0,0 +1,308 @@
!set plannerStrategy DECOUPLED
!use druidtest://?componentSupplier=DrillWindowQueryMSQComponentSupplier
!set outputformat mysql
select cityName, count(case when delta > 0 then channel end) as cnt, count(1) as aall
from wikipedia
where cityName in ('New York', 'Aarhus')
group by 1
order by 1;
+----------+-----+------+
| cityName | cnt | aall |
+----------+-----+------+
| Aarhus | 0 | 1 |
| New York | 7 | 13 |
+----------+-----+------+
(2 rows)
!ok
[ {
"stageNumber" : 0,
"definition" : {
"id" : "<taskId>_0",
"input" : [ {
"type" : "table",
"dataSource" : "wikipedia",
"intervals" : [ "-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z" ],
"filter" : {
"type" : "inType",
"column" : "cityName",
"matchValueType" : "STRING",
"sortedValues" : [ "Aarhus", "New York" ]
},
"filterFields" : [ "cityName" ]
} ],
"processor" : {
"type" : "groupByPreShuffle",
"query" : {
"queryType" : "groupBy",
"dataSource" : {
"type" : "inputNumber",
"inputNumber" : 0
},
"intervals" : {
"type" : "intervals",
"intervals" : [ "-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z" ]
},
"filter" : {
"type" : "inType",
"column" : "cityName",
"matchValueType" : "STRING",
"sortedValues" : [ "Aarhus", "New York" ]
},
"granularity" : {
"type" : "all"
},
"dimensions" : [ {
"type" : "default",
"dimension" : "cityName",
"outputName" : "d0",
"outputType" : "STRING"
} ],
"aggregations" : [ {
"type" : "filtered",
"aggregator" : {
"type" : "count",
"name" : "a0"
},
"filter" : {
"type" : "and",
"fields" : [ {
"type" : "not",
"field" : {
"type" : "null",
"column" : "channel"
}
}, {
"type" : "range",
"column" : "delta",
"matchValueType" : "LONG",
"lower" : 0,
"lowerOpen" : true
} ]
},
"name" : "a0"
}, {
"type" : "count",
"name" : "a1"
} ],
"limitSpec" : {
"type" : "NoopLimitSpec"
},
"context" : {
"__user" : null,
"finalize" : true,
"maxParseExceptions" : 0,
"plannerStrategy" : "DECOUPLED",
"sqlQueryId" : __SQL_QUERY_ID__
"sqlStringifyArrays" : false
}
}
},
"signature" : [ {
"name" : "d0",
"type" : "STRING"
}, {
"name" : "a0",
"type" : "LONG"
}, {
"name" : "a1",
"type" : "LONG"
} ],
"shuffleSpec" : {
"type" : "maxCount",
"clusterBy" : {
"columns" : [ {
"columnName" : "d0",
"order" : "ASCENDING"
} ]
},
"partitions" : 1,
"aggregate" : true
},
"maxWorkerCount" : 1
},
"phase" : "FINISHED",
"workerCount" : 1,
"partitionCount" : 1,
"shuffle" : "globalSort",
"output" : "localStorage",
"startTime" : __TIMESTAMP__
"duration" : __DURATION__
"sort" : true
}, {
"stageNumber" : 1,
"definition" : {
"id" : "<taskId>_1",
"input" : [ {
"type" : "stage",
"stage" : 0
} ],
"processor" : {
"type" : "groupByPostShuffle",
"query" : {
"queryType" : "groupBy",
"dataSource" : {
"type" : "inputNumber",
"inputNumber" : 0
},
"intervals" : {
"type" : "intervals",
"intervals" : [ "-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z" ]
},
"filter" : {
"type" : "inType",
"column" : "cityName",
"matchValueType" : "STRING",
"sortedValues" : [ "Aarhus", "New York" ]
},
"granularity" : {
"type" : "all"
},
"dimensions" : [ {
"type" : "default",
"dimension" : "cityName",
"outputName" : "d0",
"outputType" : "STRING"
} ],
"aggregations" : [ {
"type" : "filtered",
"aggregator" : {
"type" : "count",
"name" : "a0"
},
"filter" : {
"type" : "and",
"fields" : [ {
"type" : "not",
"field" : {
"type" : "null",
"column" : "channel"
}
}, {
"type" : "range",
"column" : "delta",
"matchValueType" : "LONG",
"lower" : 0,
"lowerOpen" : true
} ]
},
"name" : "a0"
}, {
"type" : "count",
"name" : "a1"
} ],
"limitSpec" : {
"type" : "NoopLimitSpec"
},
"context" : {
"__user" : null,
"finalize" : true,
"maxParseExceptions" : 0,
"plannerStrategy" : "DECOUPLED",
"sqlQueryId" : __SQL_QUERY_ID__
"sqlStringifyArrays" : false
}
}
},
"signature" : [ {
"name" : "d0",
"type" : "STRING"
}, {
"name" : "a0",
"type" : "LONG"
}, {
"name" : "a1",
"type" : "LONG"
} ],
"maxWorkerCount" : 1
},
"phase" : "FINISHED",
"workerCount" : 1,
"partitionCount" : 1,
"output" : "localStorage",
"startTime" : __TIMESTAMP__
"duration" : __DURATION__
} ]
!msqPlan
# 227
LogicalSort(sort0=[$0], dir0=[ASC])
LogicalAggregate(group=[{0}], cnt=[COUNT($1)], aall=[COUNT()])
LogicalProject(cityName=[$2], $f1=[CASE(>($17, 0), $1, null:VARCHAR)])
LogicalFilter(condition=[OR(=($2, 'New York'), =($2, 'Aarhus'))])
LogicalTableScan(table=[[druid, wikipedia]])
!convertedPlan
LogicalSort(sort0=[$0], dir0=[ASC])
LogicalAggregate(group=[{0}], cnt=[COUNT($1) FILTER $2], aall=[COUNT()])
LogicalProject(cityName=[$2], channel=[$1], $f3=[IS TRUE(>($17, 0))])
LogicalFilter(condition=[SEARCH($2, Sarg['Aarhus':VARCHAR, 'New York':VARCHAR]:VARCHAR)])
LogicalTableScan(table=[[druid, wikipedia]])
!logicalPlan
DruidAggregate(group=[{0}], cnt=[COUNT($1) FILTER $2], aall=[COUNT()], druid=[logical])
DruidProject(cityName=[$2], channel=[$1], $f3=[IS TRUE(>($17, 0))], druid=[logical])
DruidFilter(condition=[SEARCH($2, Sarg['Aarhus':VARCHAR, 'New York':VARCHAR]:VARCHAR)])
DruidTableScan(table=[[druid, wikipedia]], druid=[logical])
!druidPlan
{
"queryType" : "groupBy",
"dataSource" : {
"type" : "table",
"name" : "wikipedia"
},
"intervals" : {
"type" : "intervals",
"intervals" : [ "-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z" ]
},
"filter" : {
"type" : "inType",
"column" : "cityName",
"matchValueType" : "STRING",
"sortedValues" : [ "Aarhus", "New York" ]
},
"granularity" : {
"type" : "all"
},
"dimensions" : [ {
"type" : "default",
"dimension" : "cityName",
"outputName" : "d0",
"outputType" : "STRING"
} ],
"aggregations" : [ {
"type" : "filtered",
"aggregator" : {
"type" : "count",
"name" : "a0"
},
"filter" : {
"type" : "and",
"fields" : [ {
"type" : "not",
"field" : {
"type" : "null",
"column" : "channel"
}
}, {
"type" : "range",
"column" : "delta",
"matchValueType" : "LONG",
"lower" : 0,
"lowerOpen" : true
} ]
},
"name" : "a0"
}, {
"type" : "count",
"name" : "a1"
} ],
"limitSpec" : {
"type" : "NoopLimitSpec"
}
}
!nativePlan

View File

@ -20,7 +20,7 @@
# Quidem UT
Enables to write sql level tests easily.
Can be used to write tests against existing test backends (ComponentSupplier) - by doing so the testcases can be moved closer to the excercised codes.
Can be used to write tests against existing test backends (ComponentSupplier) - by doing so the testcases can be moved closer to the exercised codes.
## Usage
@ -39,8 +39,8 @@ sdk install maven
# run mvn to see if it works
mvn --version
# download druid sourcces (FIXME: change this to the main repo/branch before merging)
git clone --branch quidem-record https://github.com/kgyrtkirk/druid
# download druid sources
git clone https://github.com/apache/druid
```

View File

@ -43,7 +43,6 @@ import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.guice.AnnouncerModule;
import org.apache.druid.guice.BrokerProcessingModule;
import org.apache.druid.guice.BrokerServiceModule;
import org.apache.druid.guice.BuiltInTypesModule;
import org.apache.druid.guice.CoordinatorDiscoveryModule;
import org.apache.druid.guice.DruidInjectorBuilder;
import org.apache.druid.guice.ExpressionModule;
@ -66,7 +65,6 @@ import org.apache.druid.guice.StartupLoggingModule;
import org.apache.druid.guice.StorageNodeModule;
import org.apache.druid.guice.annotations.Client;
import org.apache.druid.guice.annotations.EscalatedClient;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.guice.http.HttpClientModule;
import org.apache.druid.guice.security.AuthenticatorModule;
import org.apache.druid.guice.security.AuthorizerModule;
@ -78,7 +76,6 @@ import org.apache.druid.initialization.TombstoneDataStorageModule;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.metadata.storage.derby.DerbyMetadataStorageDruidModule;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.query.RetryQueryRunnerConfig;
import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider;
import org.apache.druid.rpc.guice.ServiceClientModule;
@ -88,7 +85,6 @@ import org.apache.druid.server.BrokerQueryResource;
import org.apache.druid.server.ClientInfoResource;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.QueryLifecycleFactory;
import org.apache.druid.server.QuerySchedulerProvider;
import org.apache.druid.server.ResponseContextConfig;
import org.apache.druid.server.SpecificSegmentsQuerySegmentWalker;
import org.apache.druid.server.SubqueryGuardrailHelper;
@ -101,17 +97,10 @@ import org.apache.druid.server.initialization.AuthorizerMapperModule;
import org.apache.druid.server.initialization.ExternalStorageAccessSecurityModule;
import org.apache.druid.server.initialization.jetty.JettyServerInitializer;
import org.apache.druid.server.initialization.jetty.JettyServerModule;
import org.apache.druid.server.log.NoopRequestLogger;
import org.apache.druid.server.log.RequestLogger;
import org.apache.druid.server.metrics.QueryCountStatsProvider;
import org.apache.druid.server.metrics.SubqueryCountStatsProvider;
import org.apache.druid.server.router.TieredBrokerConfig;
import org.apache.druid.server.security.AuthenticatorMapper;
import org.apache.druid.server.security.Escalator;
import org.apache.druid.server.security.TLSCertificateCheckerModule;
import org.apache.druid.sql.calcite.planner.CalciteRulesManager;
import org.apache.druid.sql.calcite.planner.CatalogResolver;
import org.apache.druid.sql.calcite.run.NativeSqlEngine;
import org.apache.druid.sql.calcite.run.SqlEngine;
import org.apache.druid.sql.calcite.schema.BrokerSegmentMetadataCache;
import org.apache.druid.sql.calcite.schema.DruidSchemaName;
@ -158,6 +147,7 @@ public class ExposedAsBrokerQueryComponentSupplierWrapper implements QueryCompon
delegate.configureGuice(builder);
installForServerModules(builder);
builder.add(new QueryRunnerFactoryModule());
overrideModules.addAll(ExposedAsBrokerQueryComponentSupplierWrapper.brokerModules());
overrideModules.add(new BrokerTestModule());
@ -218,15 +208,6 @@ public class ExposedAsBrokerQueryComponentSupplierWrapper implements QueryCompon
@Override
protected void configure()
{
bind(AuthenticatorMapper.class).toInstance(CalciteTests.TEST_AUTHENTICATOR_MAPPER);
bind(Escalator.class).toInstance(CalciteTests.TEST_AUTHENTICATOR_ESCALATOR);
bind(RequestLogger.class).toInstance(new NoopRequestLogger());
bind(String.class)
.annotatedWith(DruidSchemaName.class)
.toInstance(CalciteTests.DRUID_SCHEMA_NAME);
bind(QuerySchedulerProvider.class).in(LazySingleton.class);
bind(CalciteRulesManager.class).toInstance(new CalciteRulesManager(ImmutableSet.of()));
bind(CatalogResolver.class).toInstance(CatalogResolver.NULL_RESOLVER);
}
@Provides
@ -249,16 +230,6 @@ public class ExposedAsBrokerQueryComponentSupplierWrapper implements QueryCompon
return localProps;
}
@Provides
@LazySingleton
public SqlEngine createMockSqlEngine(
final QuerySegmentWalker walker,
final QueryRunnerFactoryConglomerate conglomerate,
@Json ObjectMapper jsonMapper)
{
return new NativeSqlEngine(CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), jsonMapper);
}
@Provides
@LazySingleton
DruidNodeDiscoveryProvider getDruidNodeDiscoveryProvider()
@ -292,7 +263,6 @@ public class ExposedAsBrokerQueryComponentSupplierWrapper implements QueryCompon
new StorageNodeModule(),
new JettyServerModule(),
new ExpressionModule(),
new BuiltInTypesModule(),
new DiscoveryModule(),
new ServerViewModule(),
new MetadataConfigModule(),
@ -321,7 +291,6 @@ public class ExposedAsBrokerQueryComponentSupplierWrapper implements QueryCompon
{
return ImmutableList.of(
new BrokerProcessingModule(),
new QueryRunnerFactoryModule(),
new SegmentWranglerModule(),
new JoinableFactoryModule(),
new BrokerServiceModule(),

View File

@ -30,12 +30,15 @@ import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.HashSet;
import java.util.Set;
public class QuidemRecorder implements AutoCloseable, DruidHook<String>
{
private PrintStream printStream;
private File file;
private DruidHookDispatcher hookDispatcher;
private Set<String> queries = new HashSet<>();
public QuidemRecorder(URI quidemURI, DruidHookDispatcher hookDispatcher, File file)
{
@ -67,11 +70,16 @@ public class QuidemRecorder implements AutoCloseable, DruidHook<String>
public synchronized void invoke(HookKey<String> key, String query)
{
if (DruidHook.SQL.equals(key)) {
if (queries.contains(query)) {
// ignore duplicate queries
return;
}
printStream.println("# " + new Date());
printStream.print(query);
printStream.println(";");
printStream.println("!ok");
printStream.flush();
queries.add(query);
return;
}
}

View File

@ -109,4 +109,11 @@ public class AuthenticationResult
{
return Objects.hash(getIdentity(), getAuthorizerName(), getAuthenticatedBy(), getContext());
}
@Override
public String toString()
{
return "AuthenticationResult [identity=" + identity + ", authorizerName=" + authorizerName + ", authenticatedBy="
+ authenticatedBy + ", context=" + context + "]";
}
}

View File

@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.initialization;
import com.google.common.collect.ImmutableMap;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.server.security.AuthenticationResult;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class AuthenticationResultTest
{
@Test
public void testEquals()
{
EqualsVerifier.forClass(AuthenticationResult.class)
.usingGetClass()
.verify();
}
@Test
public void testToString()
{
AuthenticationResult result = new AuthenticationResult(
"name",
"authorizerName",
"authorizerUser",
ImmutableMap.of("key", "value")
);
assertEquals(
"AuthenticationResult [identity=name, "
+ "authorizerName=authorizerName, "
+ "authenticatedBy=authorizerUser, "
+ "context={key=value}]",
result.toString()
);
}
}

View File

@ -116,6 +116,13 @@ public class SqlQueryPlus
return new SqlQueryPlus(sql, queryContext, parameters, authResult);
}
@Override
public String toString()
{
return "SqlQueryPlus {queryContext=" + queryContext + ", parameters=" + parameters
+ ", authResult=" + authResult + ", sql=" + sql + " }";
}
public static class Builder
{
private String sql;

View File

@ -27,6 +27,7 @@ import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.sql.SqlQueryPlus;
import org.apache.druid.sql.avatica.DruidJdbcResultSet.ResultFetcherFactory;
import org.apache.druid.sql.calcite.planner.Calcites;
import org.apache.druid.sql.calcite.planner.PrepareResult;
@ -59,6 +60,7 @@ public abstract class AbstractDruidJdbcStatement implements Closeable
protected final ResultFetcherFactory fetcherFactory;
protected Throwable throwable;
protected DruidJdbcResultSet resultSet;
protected SqlQueryPlus sqlQuery;
public AbstractDruidJdbcStatement(
final String connectionId,
@ -246,4 +248,9 @@ public abstract class AbstractDruidJdbcStatement implements Closeable
{
return statementId;
}
public SqlQueryPlus getSqlQuery()
{
return sqlQuery;
}
}

View File

@ -57,8 +57,8 @@ public class DruidJdbcStatement extends AbstractDruidJdbcStatement
public synchronized void execute(SqlQueryPlus queryPlus, long maxRowCount)
{
closeResultSet();
queryPlus = queryPlus.withContext(queryContext);
DirectStatement stmt = lifecycleFactory.directStatement(queryPlus);
this.sqlQuery = queryPlus.withContext(queryContext);
DirectStatement stmt = lifecycleFactory.directStatement(this.sqlQuery);
resultSet = new DruidJdbcResultSet(this, stmt, Long.MAX_VALUE, fetcherFactory);
try {
resultSet.execute();

View File

@ -395,7 +395,7 @@ public class DruidMeta extends MetaImpl
throw errorHandler.sanitize(t);
}
private ExecuteResult doFetch(AbstractDruidJdbcStatement druidStatement, int maxRows)
protected ExecuteResult doFetch(AbstractDruidJdbcStatement druidStatement, int maxRows)
{
final Signature signature = druidStatement.getSignature();
final Frame firstFrame = druidStatement.nextFrame(

View File

@ -115,7 +115,6 @@ public class CountSqlAggregator implements SqlAggregator
return null;
}
// FIXME: is-all-literal
if (args.isEmpty()) {
// COUNT(*)
return Aggregation.create(new CountAggregatorFactory(name));

View File

@ -35,6 +35,7 @@ public interface DruidHook<T>
HookKey<RelNode> LOGICAL_PLAN = new HookKey<>("logicalPlan", RelNode.class);
HookKey<RelNode> DRUID_PLAN = new HookKey<>("druidPlan", RelNode.class);
HookKey<String> SQL = new HookKey<>("sql", String.class);
HookKey<String> MSQ_PLAN = new HookKey<>("msqPlan", String.class);
@Immutable
class HookKey<T>

View File

@ -20,14 +20,9 @@
package org.apache.druid.quidem;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Binder;
import com.google.inject.Injector;
import com.google.inject.Provides;
import com.google.inject.TypeLiteral;
import com.google.inject.name.Names;
import org.apache.calcite.avatica.server.AbstractAvaticaHandler;
import org.apache.druid.guice.DruidInjectorBuilder;
@ -36,40 +31,26 @@ import org.apache.druid.initialization.DruidModule;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.DefaultQueryConfig;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider;
import org.apache.druid.segment.join.JoinableFactoryWrapper;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.QueryLifecycleFactory;
import org.apache.druid.server.QueryScheduler;
import org.apache.druid.server.QuerySchedulerProvider;
import org.apache.druid.server.SpecificSegmentsQuerySegmentWalker;
import org.apache.druid.server.log.RequestLogger;
import org.apache.druid.server.log.TestRequestLogger;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.server.security.AuthenticatorMapper;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.server.security.Escalator;
import org.apache.druid.sql.avatica.AvaticaMonitor;
import org.apache.druid.sql.avatica.DruidAvaticaJsonHandler;
import org.apache.druid.sql.avatica.DruidMeta;
import org.apache.druid.sql.calcite.SqlTestFrameworkConfig;
import org.apache.druid.sql.calcite.SqlTestFrameworkConfig.ConfigurationInstance;
import org.apache.druid.sql.calcite.SqlTestFrameworkConfig.SqlTestFrameworkConfigStore;
import org.apache.druid.sql.calcite.planner.CalciteRulesManager;
import org.apache.druid.sql.calcite.planner.CatalogResolver;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
import org.apache.druid.sql.calcite.run.SqlEngine;
import org.apache.druid.sql.calcite.schema.DruidSchemaCatalog;
import org.apache.druid.sql.calcite.schema.DruidSchemaName;
import org.apache.druid.sql.calcite.util.CalciteTests;
import org.apache.druid.sql.calcite.util.SqlTestFramework;
import org.apache.druid.sql.calcite.util.SqlTestFramework.Builder;
import org.apache.druid.sql.calcite.util.SqlTestFramework.PlannerComponentSupplier;
import org.apache.druid.sql.calcite.util.SqlTestFramework.QueryComponentSupplier;
import org.apache.druid.sql.guice.SqlModule;
import org.apache.druid.sql.hook.DruidHookDispatcher;
import org.apache.http.client.utils.URIBuilder;
import org.eclipse.jetty.server.Server;
@ -241,31 +222,12 @@ public class DruidAvaticaTestDriver implements Driver
public void configureGuice(DruidInjectorBuilder builder)
{
delegate.configureGuice(builder);
TestRequestLogger testRequestLogger = new TestRequestLogger();
builder.addModule(connectionModule);
builder.addModule(
binder -> {
binder.bindConstant().annotatedWith(Names.named("serviceName")).to("test");
binder.bindConstant().annotatedWith(Names.named("servicePort")).to(0);
binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(-1);
binder.bind(AuthenticatorMapper.class).toInstance(CalciteTests.TEST_AUTHENTICATOR_MAPPER);
binder.bind(AuthorizerMapper.class).toInstance(CalciteTests.TEST_AUTHORIZER_MAPPER);
binder.bind(Escalator.class).toInstance(CalciteTests.TEST_AUTHENTICATOR_ESCALATOR);
binder.bind(RequestLogger.class).toInstance(testRequestLogger);
binder.bind(String.class)
.annotatedWith(DruidSchemaName.class)
.toInstance(CalciteTests.DRUID_SCHEMA_NAME);
binder.bind(ServiceEmitter.class).to(NoopServiceEmitter.class);
binder.bind(QuerySchedulerProvider.class).in(LazySingleton.class);
binder.bind(QueryScheduler.class)
.toProvider(QuerySchedulerProvider.class)
.in(LazySingleton.class);
binder.install(new SqlModule.SqlStatementFactoryModule());
binder.bind(new TypeLiteral<Supplier<DefaultQueryConfig>>()
{
}).toInstance(Suppliers.ofInstance(new DefaultQueryConfig(ImmutableMap.of())));
binder.bind(CalciteRulesManager.class).toInstance(new CalciteRulesManager(ImmutableSet.of()));
binder.bind(CatalogResolver.class).toInstance(CatalogResolver.NULL_RESOLVER);
}
);
}

View File

@ -65,6 +65,9 @@ public class DruidQuidemCommandHandler implements CommandHandler
if (line.startsWith("nativePlan")) {
return new NativePlanCommand(lines, content);
}
if (line.startsWith("msqPlan")) {
return new MSQPlanCommand(lines, content);
}
return null;
}
@ -117,6 +120,11 @@ public class DruidQuidemCommandHandler implements CommandHandler
}
}
protected final DruidHookDispatcher unwrapDruidHookDispatcher(Context x)
{
return DruidConnectionExtras.unwrapOrThrow(x.connection()).getDruidHookDispatcher();
}
protected abstract void executeExplain(Context x) throws Exception;
}
@ -186,10 +194,33 @@ public class DruidQuidemCommandHandler implements CommandHandler
x.echo(ImmutableList.of(str));
}
}
}
protected final DruidHookDispatcher unwrapDruidHookDispatcher(Context x)
/**
* Handles plan commands captured via {@link Hook}.
*/
abstract static class AbstractStringCaptureCommand extends AbstractPlanCommand
{
HookKey<String> hook;
AbstractStringCaptureCommand(List<String> lines, List<String> content, DruidHook.HookKey<String> hook)
{
return DruidConnectionExtras.unwrapOrThrow(x.connection()).getDruidHookDispatcher();
super(lines, content);
this.hook = hook;
}
@Override
protected final void executeExplain(Context x) throws IOException
{
DruidHookDispatcher dhp = unwrapDruidHookDispatcher(x);
List<String> logged = new ArrayList<>();
try (Closeable unhook = dhp.withHook(hook, (key, relNode) -> {
logged.add(relNode);
})) {
executeQuery(x);
}
x.echo(logged);
}
}
@ -216,4 +247,11 @@ public class DruidQuidemCommandHandler implements CommandHandler
super(lines, content, DruidHook.CONVERTED_PLAN);
}
}
static class MSQPlanCommand extends AbstractStringCaptureCommand
{
MSQPlanCommand(List<String> lines, List<String> content)
{
super(lines, content, DruidHook.MSQ_PLAN);
}
}
}

View File

@ -74,7 +74,7 @@ import static org.junit.jupiter.api.Assertions.fail;
* <li>Copy over the .iq.out to .iq to accept the changes</li>
* </ol>
*
* To shorten the above 2 steps
* To shorten the above 2 steps you can run the test with system property quiem.overwrite=true
*
*/
@TestInstance(TestInstance.Lifecycle.PER_CLASS)

View File

@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.quidem;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Binder;
import com.google.inject.TypeLiteral;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.initialization.ServerInjectorBuilderTest.TestDruidModule;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.DefaultQueryConfig;
import org.apache.druid.server.QueryScheduler;
import org.apache.druid.server.QuerySchedulerProvider;
import org.apache.druid.server.log.RequestLogger;
import org.apache.druid.server.log.TestRequestLogger;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.server.security.AuthTestUtils;
import org.apache.druid.server.security.AuthenticatorMapper;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.server.security.Escalator;
import org.apache.druid.sql.calcite.planner.CalciteRulesManager;
import org.apache.druid.sql.calcite.planner.CatalogResolver;
import org.apache.druid.sql.calcite.schema.DruidSchemaName;
import org.apache.druid.sql.calcite.util.CalciteTests;
import org.apache.druid.sql.guice.SqlModule;
public class TestSqlModule extends TestDruidModule
{
@Override
public void configure(Binder binder)
{
binder.install(new SqlModule.SqlStatementFactoryModule());
binder.bind(String.class)
.annotatedWith(DruidSchemaName.class)
.toInstance(CalciteTests.DRUID_SCHEMA_NAME);
binder.bind(new TypeLiteral<Supplier<DefaultQueryConfig>>()
{
}).toInstance(Suppliers.ofInstance(new DefaultQueryConfig(ImmutableMap.of())));
binder.bind(CalciteRulesManager.class).toInstance(new CalciteRulesManager(ImmutableSet.of()));
TestRequestLogger testRequestLogger = new TestRequestLogger();
binder.bind(RequestLogger.class).toInstance(testRequestLogger);
binder.bind(CatalogResolver.class).toInstance(CatalogResolver.NULL_RESOLVER);
binder.bind(ServiceEmitter.class).to(NoopServiceEmitter.class);
binder.bind(QueryScheduler.class)
.toProvider(QuerySchedulerProvider.class)
.in(LazySingleton.class);
binder.bind(QuerySchedulerProvider.class).in(LazySingleton.class);
binder.bind(AuthenticatorMapper.class).toInstance(CalciteTests.TEST_AUTHENTICATOR_MAPPER);
binder.bind(AuthorizerMapper.class).toInstance(AuthTestUtils.TEST_AUTHORIZER_MAPPER);
binder.bind(Escalator.class).toInstance(CalciteTests.TEST_AUTHENTICATOR_ESCALATOR);
}
}

View File

@ -887,6 +887,12 @@ public class BaseCalciteQueryTest extends CalciteTestBase
this.isRunningMSQ = isRunningMSQ;
}
public CalciteTestConfig(Map<String, Object> baseQueryContext, boolean isRunningMSQ)
{
this(baseQueryContext);
this.isRunningMSQ = isRunningMSQ;
}
public CalciteTestConfig(Map<String, Object> baseQueryContext)
{
Preconditions.checkNotNull(baseQueryContext, "baseQueryContext is null");

View File

@ -24,8 +24,6 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.calcite.avatica.SqlType;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.guice.BuiltInTypesModule;
import org.apache.druid.guice.DruidInjectorBuilder;
import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
@ -70,7 +68,6 @@ import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.join.JoinType;
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
import org.apache.druid.sql.calcite.CalciteArraysQueryTest.ArraysComponentSupplier;
import org.apache.druid.sql.calcite.filtration.Filtration;
import org.apache.druid.sql.calcite.util.CalciteTests;
import org.apache.druid.sql.calcite.util.SqlTestFramework.StandardComponentSupplier;
@ -86,7 +83,7 @@ import java.util.Map;
/**
* Tests for array functions and array types
*/
@SqlTestFrameworkConfig.ComponentSupplier(ArraysComponentSupplier.class)
@SqlTestFrameworkConfig.ComponentSupplier(StandardComponentSupplier.class)
public class CalciteArraysQueryTest extends BaseCalciteQueryTest
{
private static final Map<String, Object> QUERY_CONTEXT_UNNEST =
@ -119,21 +116,6 @@ public class CalciteArraysQueryTest extends BaseCalciteQueryTest
}
}
protected static class ArraysComponentSupplier extends StandardComponentSupplier
{
public ArraysComponentSupplier(TempDirProducer tempFolderProducer)
{
super(tempFolderProducer);
}
@Override
public void configureGuice(DruidInjectorBuilder builder)
{
super.configureGuice(builder);
builder.addModule(new BuiltInTypesModule());
}
}
// test some query stuffs, sort of limited since no native array column types so either need to use constructor or
// array aggregator
@Test

View File

@ -821,8 +821,11 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
@MethodSource("provideQueryContexts")
@ParameterizedTest(name = "{0}")
public void testFilterAndGroupByLookupUsingJoinOperatorWithNotFilter(Map<String, Object> queryContext)
{
assumeFalse(
isRunningMSQ() && isSortBasedJoin() && NullHandling.replaceWithDefault(),
"test disabled; returns incorrect results in this mode"
);
// Cannot vectorize JOIN operator.
cannotVectorize();
@ -5645,6 +5648,10 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
@ParameterizedTest(name = "{0}")
public void testRegressionFilteredAggregatorsSubqueryJoins(Map<String, Object> queryContext)
{
assumeFalse(
isRunningMSQ() && isSortBasedJoin() && NullHandling.replaceWithDefault(),
"test disabled; returns incorrect results in this mode"
);
assumeFalse(testBuilder().isDecoupledMode() && NullHandling.replaceWithDefault(), "not support in decoupled mode");
cannotVectorize();

View File

@ -196,7 +196,6 @@ public class CalciteNestedDataQueryTest extends BaseCalciteQueryTest
public void configureGuice(DruidInjectorBuilder builder)
{
super.configureGuice(builder);
builder.addModule(new BuiltInTypesModule());
}
@SuppressWarnings("resource")

View File

@ -93,24 +93,6 @@ public abstract class CalciteTestBase
}
}
// FIXME remove
public TempFolderOverTempDir temXMEXAXISporaryFolder = new TempFolderOverTempDir();
public class TempFolderOverTempDir
{
public File newFolder()
{
return newTempFolder("unknown");
}
public File newFolder(String string)
{
return newTempFolder(string);
}
}
/**
* @deprecated prefer to make {@link DruidExpression} directly to ensure expression tests accurately test the full
* expression structure, this method is just to have a convenient way to fix a very large number of existing tests

View File

@ -20,12 +20,14 @@
package org.apache.druid.sql.calcite.util;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Binder;
import com.google.inject.Injector;
import com.google.inject.Module;
import com.google.inject.Provides;
import org.apache.druid.guice.BuiltInTypesModule;
import org.apache.druid.guice.DruidInjectorBuilder;
import org.apache.druid.guice.ExpressionModule;
import org.apache.druid.guice.LazySingleton;
@ -42,6 +44,7 @@ import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider;
import org.apache.druid.query.topn.TopNQueryConfig;
import org.apache.druid.quidem.TestSqlModule;
import org.apache.druid.segment.DefaultColumnFormatConfig;
import org.apache.druid.segment.join.JoinableFactoryWrapper;
import org.apache.druid.server.QueryLifecycle;
@ -77,6 +80,7 @@ import java.io.Closeable;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.Set;
@ -547,15 +551,29 @@ public class SqlTestFramework
private class TestSetupModule implements DruidModule
{
private final Builder builder;
private final List<DruidModule> subModules = Arrays.asList(new BuiltInTypesModule(), new TestSqlModule());
public TestSetupModule(Builder builder)
{
this.builder = builder;
}
@Override
public List<? extends com.fasterxml.jackson.databind.Module> getJacksonModules()
{
ImmutableList.Builder<com.fasterxml.jackson.databind.Module> builder = ImmutableList.builder();
for (DruidModule druidModule : subModules) {
builder.addAll(druidModule.getJacksonModules());
}
return builder.build();
}
@Override
public void configure(Binder binder)
{
for (DruidModule module : subModules) {
binder.install(module);
}
binder.bind(DruidOperatorTable.class).in(LazySingleton.class);
binder.bind(DataSegment.PruneSpecsHolder.class).toInstance(DataSegment.PruneSpecsHolder.DEFAULT);
binder.bind(DefaultColumnFormatConfig.class).toInstance(new DefaultColumnFormatConfig(null, null));