mirror of https://github.com/apache/druid.git
Port Calcite's tests to run with MSQ (#13625)
* SQL test framework extensions * Capture planner artifacts: logical plan, etc. * Planner test builder validates the logical plan * Validation for the SQL resut schema (we already have validation for the Druid row signature) * Better Guice integration: properties, reuse Guice modules * Avoid need for hand-coded expr, macro tables * Retire some of the test-specific query component creation * Fix query log hook race condition Co-authored-by: Paul Rogers <progers@apache.org>
This commit is contained in:
parent
fb26a1093d
commit
a516eb1a41
|
@ -22,6 +22,7 @@ package org.apache.druid.msq.sql;
|
|||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.calcite.runtime.Hook;
|
||||
import org.apache.calcite.sql.type.SqlTypeName;
|
||||
import org.apache.calcite.util.Pair;
|
||||
import org.apache.druid.common.guava.FutureUtils;
|
||||
|
@ -108,6 +109,7 @@ public class MSQTaskQueryMaker implements QueryMaker
|
|||
@Override
|
||||
public QueryResponse<Object[]> runQuery(final DruidQuery druidQuery)
|
||||
{
|
||||
Hook.QUERY_PLAN.run(druidQuery.getQuery());
|
||||
String taskId = MSQTasks.controllerTaskId(plannerContext.getSqlQueryId());
|
||||
|
||||
QueryContext queryContext = plannerContext.queryContext();
|
||||
|
|
|
@ -141,6 +141,11 @@ public class MSQTaskSqlEngine implements SqlEngine
|
|||
);
|
||||
}
|
||||
|
||||
public OverlordClient overlordClient()
|
||||
{
|
||||
return overlordClient;
|
||||
}
|
||||
|
||||
@Override
|
||||
public QueryMaker buildQueryMakerForInsert(
|
||||
final String targetDataSource,
|
||||
|
|
|
@ -192,7 +192,7 @@ public class MSQFaultsTest extends MSQTestBase
|
|||
.add("__time", ColumnType.LONG)
|
||||
.build();
|
||||
|
||||
File file = MSQTestFileUtils.generateTemporaryNdJsonFile(30000, 1);
|
||||
File file = MSQTestFileUtils.generateTemporaryNdJsonFile(temporaryFolder, 30000, 1);
|
||||
String filePathAsJson = queryFramework().queryJsonMapper().writeValueAsString(file.getAbsolutePath());
|
||||
|
||||
testIngestQuery().setSql(" insert into foo1 SELECT\n"
|
||||
|
@ -325,7 +325,7 @@ public class MSQFaultsTest extends MSQTestBase
|
|||
|
||||
final int numFiles = 20000;
|
||||
|
||||
final File toRead = MSQTestFileUtils.getResourceAsTemporaryFile(this, "/wikipedia-sampled.json");
|
||||
final File toRead = MSQTestFileUtils.getResourceAsTemporaryFile(temporaryFolder, this, "/wikipedia-sampled.json");
|
||||
final String toReadFileNameAsJson = queryFramework().queryJsonMapper().writeValueAsString(toRead.getAbsolutePath());
|
||||
|
||||
String externalFiles = String.join(", ", Collections.nCopies(numFiles, toReadFileNameAsJson));
|
||||
|
|
|
@ -106,7 +106,7 @@ public class MSQInsertTest extends MSQTestBase
|
|||
@Test
|
||||
public void testInsertOnExternalDataSource() throws IOException
|
||||
{
|
||||
final File toRead = MSQTestFileUtils.getResourceAsTemporaryFile(this, "/wikipedia-sampled.json");
|
||||
final File toRead = MSQTestFileUtils.getResourceAsTemporaryFile(temporaryFolder, this, "/wikipedia-sampled.json");
|
||||
final String toReadFileNameAsJson = queryFramework().queryJsonMapper().writeValueAsString(toRead.getAbsolutePath());
|
||||
|
||||
RowSignature rowSignature = RowSignature.builder()
|
||||
|
@ -379,7 +379,7 @@ public class MSQInsertTest extends MSQTestBase
|
|||
@Test
|
||||
public void testRollUpOnExternalDataSource() throws IOException
|
||||
{
|
||||
final File toRead = MSQTestFileUtils.getResourceAsTemporaryFile(this, "/wikipedia-sampled.json");
|
||||
final File toRead = MSQTestFileUtils.getResourceAsTemporaryFile(temporaryFolder, this, "/wikipedia-sampled.json");
|
||||
final String toReadFileNameAsJson = queryFramework().queryJsonMapper().writeValueAsString(toRead.getAbsolutePath());
|
||||
|
||||
RowSignature rowSignature = RowSignature.builder()
|
||||
|
@ -416,7 +416,7 @@ public class MSQInsertTest extends MSQTestBase
|
|||
@Test()
|
||||
public void testRollUpOnExternalDataSourceWithCompositeKey() throws IOException
|
||||
{
|
||||
final File toRead = MSQTestFileUtils.getResourceAsTemporaryFile(this, "/wikipedia-sampled.json");
|
||||
final File toRead = MSQTestFileUtils.getResourceAsTemporaryFile(temporaryFolder, this, "/wikipedia-sampled.json");
|
||||
final String toReadFileNameAsJson = queryFramework().queryJsonMapper().writeValueAsString(toRead.getAbsolutePath());
|
||||
|
||||
RowSignature rowSignature = RowSignature.builder()
|
||||
|
@ -552,7 +552,7 @@ public class MSQInsertTest extends MSQTestBase
|
|||
@Test
|
||||
public void testInsertWithTooLargeRowShouldThrowException() throws IOException
|
||||
{
|
||||
final File toRead = MSQTestFileUtils.getResourceAsTemporaryFile(this, "/wikipedia-sampled.json");
|
||||
final File toRead = MSQTestFileUtils.getResourceAsTemporaryFile(temporaryFolder, this, "/wikipedia-sampled.json");
|
||||
final String toReadFileNameAsJson = queryFramework().queryJsonMapper().writeValueAsString(toRead.getAbsolutePath());
|
||||
|
||||
Mockito.doReturn(500).when(workerMemoryParameters).getLargeFrameSize();
|
||||
|
|
|
@ -144,7 +144,7 @@ public class MSQReplaceTest extends MSQTestBase
|
|||
.add("__time", ColumnType.LONG)
|
||||
.add("cnt", ColumnType.LONG).build();
|
||||
|
||||
final File toRead = MSQTestFileUtils.getResourceAsTemporaryFile(this, "/wikipedia-sampled.json");
|
||||
final File toRead = MSQTestFileUtils.getResourceAsTemporaryFile(temporaryFolder, this, "/wikipedia-sampled.json");
|
||||
final String toReadFileNameAsJson = queryFramework().queryJsonMapper().writeValueAsString(toRead.getAbsolutePath());
|
||||
|
||||
testIngestQuery().setSql(" REPLACE INTO foo1 OVERWRITE ALL SELECT "
|
||||
|
@ -205,7 +205,7 @@ public class MSQReplaceTest extends MSQTestBase
|
|||
.add("__time", ColumnType.LONG)
|
||||
.add("user", ColumnType.STRING).build();
|
||||
|
||||
final File toRead = MSQTestFileUtils.getResourceAsTemporaryFile(this, "/wikipedia-sampled.json");
|
||||
final File toRead = MSQTestFileUtils.getResourceAsTemporaryFile(temporaryFolder, this, "/wikipedia-sampled.json");
|
||||
final String toReadFileNameAsJson = queryFramework().queryJsonMapper().writeValueAsString(toRead.getAbsolutePath());
|
||||
|
||||
testIngestQuery().setSql(
|
||||
|
|
|
@ -774,7 +774,7 @@ public class MSQSelectTest extends MSQTestBase
|
|||
@Test
|
||||
public void testExternSelect1() throws IOException
|
||||
{
|
||||
final File toRead = MSQTestFileUtils.getResourceAsTemporaryFile(this, "/wikipedia-sampled.json");
|
||||
final File toRead = MSQTestFileUtils.getResourceAsTemporaryFile(temporaryFolder, this, "/wikipedia-sampled.json");
|
||||
final String toReadAsJson = queryFramework().queryJsonMapper().writeValueAsString(toRead.getAbsolutePath());
|
||||
|
||||
RowSignature rowSignature = RowSignature.builder()
|
||||
|
@ -1328,7 +1328,7 @@ public class MSQSelectTest extends MSQTestBase
|
|||
@Test
|
||||
public void testMultiValueStringWithIncorrectType() throws IOException
|
||||
{
|
||||
final File toRead = MSQTestFileUtils.getResourceAsTemporaryFile(this, "/unparseable-mv-string-array.json");
|
||||
final File toRead = MSQTestFileUtils.getResourceAsTemporaryFile(temporaryFolder, this, "/unparseable-mv-string-array.json");
|
||||
final String toReadAsJson = queryFramework().queryJsonMapper().writeValueAsString(toRead.getAbsolutePath());
|
||||
|
||||
RowSignature rowSignature = RowSignature.builder()
|
||||
|
|
|
@ -65,7 +65,7 @@ public class MSQWarningsTest extends MSQTestBase
|
|||
@Before
|
||||
public void setUp3() throws IOException
|
||||
{
|
||||
File tempFile = MSQTestFileUtils.getResourceAsTemporaryFile(this, "/unparseable.gz");
|
||||
File tempFile = MSQTestFileUtils.getResourceAsTemporaryFile(temporaryFolder, this, "/unparseable.gz");
|
||||
|
||||
// Rename the file and the file's extension from .tmp to .gz to prevent issues with 'parsing' the file
|
||||
toRead = new File(tempFile.getParentFile(), "unparseable.gz");
|
||||
|
|
|
@ -0,0 +1,303 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.msq.test;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.inject.Injector;
|
||||
import com.google.inject.Module;
|
||||
import com.google.inject.TypeLiteral;
|
||||
import org.apache.druid.data.input.impl.DimensionsSpec;
|
||||
import org.apache.druid.data.input.impl.LongDimensionSchema;
|
||||
import org.apache.druid.data.input.impl.StringDimensionSchema;
|
||||
import org.apache.druid.discovery.NodeRole;
|
||||
import org.apache.druid.guice.GuiceInjectors;
|
||||
import org.apache.druid.guice.IndexingServiceTuningConfigModule;
|
||||
import org.apache.druid.guice.JoinableFactoryModule;
|
||||
import org.apache.druid.guice.annotations.Self;
|
||||
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
|
||||
import org.apache.druid.java.util.common.ISE;
|
||||
import org.apache.druid.java.util.common.Pair;
|
||||
import org.apache.druid.java.util.common.concurrent.Execs;
|
||||
import org.apache.druid.java.util.common.io.Closer;
|
||||
import org.apache.druid.math.expr.ExprMacroTable;
|
||||
import org.apache.druid.msq.guice.MSQExternalDataSourceModule;
|
||||
import org.apache.druid.msq.guice.MSQIndexingModule;
|
||||
import org.apache.druid.msq.querykit.DataSegmentProvider;
|
||||
import org.apache.druid.msq.querykit.LazyResourceHolder;
|
||||
import org.apache.druid.query.DruidProcessingConfig;
|
||||
import org.apache.druid.query.ForwardingQueryProcessingPool;
|
||||
import org.apache.druid.query.QueryProcessingPool;
|
||||
import org.apache.druid.query.aggregation.CountAggregatorFactory;
|
||||
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
|
||||
import org.apache.druid.query.aggregation.FloatSumAggregatorFactory;
|
||||
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
|
||||
import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
|
||||
import org.apache.druid.query.groupby.GroupByQueryConfig;
|
||||
import org.apache.druid.query.groupby.GroupByQueryRunnerTest;
|
||||
import org.apache.druid.query.groupby.TestGroupByBuffers;
|
||||
import org.apache.druid.query.groupby.strategy.GroupByStrategySelector;
|
||||
import org.apache.druid.query.lookup.LookupReferencesManager;
|
||||
import org.apache.druid.segment.IndexBuilder;
|
||||
import org.apache.druid.segment.IndexIO;
|
||||
import org.apache.druid.segment.QueryableIndex;
|
||||
import org.apache.druid.segment.QueryableIndexStorageAdapter;
|
||||
import org.apache.druid.segment.Segment;
|
||||
import org.apache.druid.segment.StorageAdapter;
|
||||
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
|
||||
import org.apache.druid.segment.loading.DataSegmentPusher;
|
||||
import org.apache.druid.segment.loading.LocalDataSegmentPusher;
|
||||
import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig;
|
||||
import org.apache.druid.segment.loading.SegmentCacheManager;
|
||||
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
|
||||
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
|
||||
import org.apache.druid.server.SegmentManager;
|
||||
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
|
||||
import org.apache.druid.server.coordination.NoopDataSegmentAnnouncer;
|
||||
import org.apache.druid.sql.calcite.util.CalciteTests;
|
||||
import org.apache.druid.timeline.DataSegment;
|
||||
import org.apache.druid.timeline.SegmentId;
|
||||
import org.easymock.EasyMock;
|
||||
import org.joda.time.Interval;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.io.Closeable;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import static org.apache.druid.sql.calcite.util.CalciteTests.DATASOURCE1;
|
||||
import static org.apache.druid.sql.calcite.util.CalciteTests.DATASOURCE2;
|
||||
import static org.apache.druid.sql.calcite.util.CalciteTests.DATASOURCE3;
|
||||
import static org.apache.druid.sql.calcite.util.TestDataBuilder.INDEX_SCHEMA_NUMERIC_DIMS;
|
||||
import static org.apache.druid.sql.calcite.util.TestDataBuilder.ROWS1;
|
||||
import static org.apache.druid.sql.calcite.util.TestDataBuilder.ROWS1_WITH_NUMERIC_DIMS;
|
||||
import static org.apache.druid.sql.calcite.util.TestDataBuilder.ROWS2;
|
||||
|
||||
/**
|
||||
* Helper class aiding in wiring up the Guice bindings required for MSQ engine to work with the Calcite's tests
|
||||
*/
|
||||
public class CalciteMSQTestsHelper
|
||||
{
|
||||
public static List<Module> fetchModules(
|
||||
TemporaryFolder temporaryFolder,
|
||||
TestGroupByBuffers groupByBuffers
|
||||
)
|
||||
{
|
||||
|
||||
Module customBindings =
|
||||
binder -> {
|
||||
final LookupReferencesManager lookupReferencesManager =
|
||||
EasyMock.createStrictMock(LookupReferencesManager.class);
|
||||
EasyMock.expect(lookupReferencesManager.getAllLookupNames()).andReturn(Collections.emptySet()).anyTimes();
|
||||
EasyMock.replay(lookupReferencesManager);
|
||||
binder.bind(LookupReferencesManager.class).toInstance(lookupReferencesManager);
|
||||
binder.bind(AppenderatorsManager.class).toProvider(() -> null);
|
||||
|
||||
// Requirements of JoinableFactoryModule
|
||||
binder.bind(SegmentManager.class).toInstance(EasyMock.createMock(SegmentManager.class));
|
||||
|
||||
binder.bind(new TypeLiteral<Set<NodeRole>>()
|
||||
{
|
||||
}).annotatedWith(Self.class).toInstance(ImmutableSet.of(NodeRole.PEON));
|
||||
|
||||
DruidProcessingConfig druidProcessingConfig = new DruidProcessingConfig()
|
||||
{
|
||||
@Override
|
||||
public String getFormatString()
|
||||
{
|
||||
return "test";
|
||||
}
|
||||
};
|
||||
binder.bind(DruidProcessingConfig.class).toInstance(druidProcessingConfig);
|
||||
binder.bind(QueryProcessingPool.class)
|
||||
.toInstance(new ForwardingQueryProcessingPool(Execs.singleThreaded("Test-runner-processing-pool")));
|
||||
|
||||
// Select queries donot require this
|
||||
Injector dummyInjector = GuiceInjectors.makeStartupInjectorWithModules(
|
||||
ImmutableList.of(
|
||||
binder1 -> {
|
||||
binder1.bind(ExprMacroTable.class).toInstance(CalciteTests.createExprMacroTable());
|
||||
binder1.bind(DataSegment.PruneSpecsHolder.class).toInstance(DataSegment.PruneSpecsHolder.DEFAULT);
|
||||
}
|
||||
)
|
||||
);
|
||||
ObjectMapper testMapper = MSQTestBase.setupObjectMapper(dummyInjector);
|
||||
IndexIO indexIO = new IndexIO(testMapper, () -> 0);
|
||||
SegmentCacheManager segmentCacheManager = null;
|
||||
try {
|
||||
segmentCacheManager = new SegmentCacheManagerFactory(testMapper).manufacturate(temporaryFolder.newFolder(
|
||||
"test"));
|
||||
}
|
||||
catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
LocalDataSegmentPusherConfig config = new LocalDataSegmentPusherConfig();
|
||||
MSQTestSegmentManager segmentManager = new MSQTestSegmentManager(segmentCacheManager, indexIO);
|
||||
try {
|
||||
config.storageDirectory = temporaryFolder.newFolder("localsegments");
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw new ISE(e, "Unable to create folder");
|
||||
}
|
||||
binder.bind(DataSegmentPusher.class).toProvider(() -> new MSQTestDelegateDataSegmentPusher(
|
||||
new LocalDataSegmentPusher(config),
|
||||
segmentManager
|
||||
));
|
||||
binder.bind(DataSegmentAnnouncer.class).toInstance(new NoopDataSegmentAnnouncer());
|
||||
binder.bind(DataSegmentProvider.class)
|
||||
.toInstance((dataSegment, channelCounters) ->
|
||||
new LazyResourceHolder<>(getSupplierForSegment(dataSegment)));
|
||||
|
||||
GroupByQueryConfig groupByQueryConfig = new GroupByQueryConfig();
|
||||
binder.bind(GroupByStrategySelector.class)
|
||||
.toInstance(GroupByQueryRunnerTest.makeQueryRunnerFactory(groupByQueryConfig, groupByBuffers)
|
||||
.getStrategySelector());
|
||||
};
|
||||
return ImmutableList.of(
|
||||
customBindings,
|
||||
new IndexingServiceTuningConfigModule(),
|
||||
new JoinableFactoryModule(),
|
||||
new MSQExternalDataSourceModule(),
|
||||
new MSQIndexingModule()
|
||||
);
|
||||
}
|
||||
|
||||
private static Supplier<Pair<Segment, Closeable>> getSupplierForSegment(SegmentId segmentId)
|
||||
{
|
||||
final TemporaryFolder temporaryFolder = new TemporaryFolder();
|
||||
try {
|
||||
temporaryFolder.create();
|
||||
}
|
||||
catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
final QueryableIndex index;
|
||||
try {
|
||||
switch (segmentId.getDataSource()) {
|
||||
case DATASOURCE1:
|
||||
IncrementalIndexSchema foo1Schema = new IncrementalIndexSchema.Builder()
|
||||
.withMetrics(
|
||||
new CountAggregatorFactory("cnt"),
|
||||
new FloatSumAggregatorFactory("m1", "m1"),
|
||||
new DoubleSumAggregatorFactory("m2", "m2"),
|
||||
new HyperUniquesAggregatorFactory("unique_dim1", "dim1")
|
||||
)
|
||||
.withRollup(false)
|
||||
.build();
|
||||
index = IndexBuilder
|
||||
.create()
|
||||
.tmpDir(new File(temporaryFolder.newFolder(), "1"))
|
||||
.segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
|
||||
.schema(foo1Schema)
|
||||
.rows(ROWS1)
|
||||
.buildMMappedIndex();
|
||||
break;
|
||||
case DATASOURCE2:
|
||||
final IncrementalIndexSchema indexSchemaDifferentDim3M1Types = new IncrementalIndexSchema.Builder()
|
||||
.withDimensionsSpec(
|
||||
new DimensionsSpec(
|
||||
ImmutableList.of(
|
||||
new StringDimensionSchema("dim1"),
|
||||
new StringDimensionSchema("dim2"),
|
||||
new LongDimensionSchema("dim3")
|
||||
)
|
||||
)
|
||||
)
|
||||
.withMetrics(
|
||||
new CountAggregatorFactory("cnt"),
|
||||
new LongSumAggregatorFactory("m1", "m1"),
|
||||
new DoubleSumAggregatorFactory("m2", "m2"),
|
||||
new HyperUniquesAggregatorFactory("unique_dim1", "dim1")
|
||||
)
|
||||
.withRollup(false)
|
||||
.build();
|
||||
index = IndexBuilder
|
||||
.create()
|
||||
.tmpDir(new File(temporaryFolder.newFolder(), "2"))
|
||||
.segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
|
||||
.schema(indexSchemaDifferentDim3M1Types)
|
||||
.rows(ROWS2)
|
||||
.buildMMappedIndex();
|
||||
break;
|
||||
case DATASOURCE3:
|
||||
index = IndexBuilder
|
||||
.create()
|
||||
.tmpDir(new File(temporaryFolder.newFolder(), "3"))
|
||||
.segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
|
||||
.schema(INDEX_SCHEMA_NUMERIC_DIMS)
|
||||
.rows(ROWS1_WITH_NUMERIC_DIMS)
|
||||
.buildMMappedIndex();
|
||||
break;
|
||||
default:
|
||||
throw new ISE("Cannot query segment %s in test runner", segmentId);
|
||||
|
||||
}
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw new ISE(e, "Unable to load index for segment %s", segmentId);
|
||||
}
|
||||
Segment segment = new Segment()
|
||||
{
|
||||
@Override
|
||||
public SegmentId getId()
|
||||
{
|
||||
return segmentId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Interval getDataInterval()
|
||||
{
|
||||
return segmentId.getInterval();
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public QueryableIndex asQueryableIndex()
|
||||
{
|
||||
return index;
|
||||
}
|
||||
|
||||
@Override
|
||||
public StorageAdapter asStorageAdapter()
|
||||
{
|
||||
return new QueryableIndexStorageAdapter(index);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close()
|
||||
{
|
||||
}
|
||||
};
|
||||
return new Supplier<Pair<Segment, Closeable>>()
|
||||
{
|
||||
@Override
|
||||
public Pair<Segment, Closeable> get()
|
||||
{
|
||||
return new Pair<>(segment, Closer.create());
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
|
@ -0,0 +1,164 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.msq.test;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.inject.Injector;
|
||||
import com.google.inject.Module;
|
||||
import org.apache.druid.guice.DruidInjectorBuilder;
|
||||
import org.apache.druid.msq.exec.WorkerMemoryParameters;
|
||||
import org.apache.druid.msq.sql.MSQTaskSqlEngine;
|
||||
import org.apache.druid.query.groupby.TestGroupByBuffers;
|
||||
import org.apache.druid.server.QueryLifecycleFactory;
|
||||
import org.apache.druid.sql.calcite.CalciteQueryTest;
|
||||
import org.apache.druid.sql.calcite.QueryTestBuilder;
|
||||
import org.apache.druid.sql.calcite.run.SqlEngine;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Ignore;
|
||||
|
||||
/**
|
||||
* Runs {@link CalciteQueryTest} but with MSQ engine
|
||||
*/
|
||||
public class CalciteSelectQueryTestMSQ extends CalciteQueryTest
|
||||
{
|
||||
|
||||
private MSQTestOverlordServiceClient indexingServiceClient;
|
||||
private TestGroupByBuffers groupByBuffers;
|
||||
|
||||
@Before
|
||||
public void setup2()
|
||||
{
|
||||
groupByBuffers = TestGroupByBuffers.createDefault();
|
||||
}
|
||||
|
||||
@After
|
||||
public void teardown2()
|
||||
{
|
||||
groupByBuffers.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void configureGuice(DruidInjectorBuilder builder)
|
||||
{
|
||||
super.configureGuice(builder);
|
||||
builder.addModules(CalciteMSQTestsHelper.fetchModules(temporaryFolder, groupByBuffers).toArray(new Module[0]));
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public SqlEngine createEngine(
|
||||
QueryLifecycleFactory qlf,
|
||||
ObjectMapper queryJsonMapper,
|
||||
Injector injector
|
||||
)
|
||||
{
|
||||
final WorkerMemoryParameters workerMemoryParameters =
|
||||
WorkerMemoryParameters.createInstance(
|
||||
WorkerMemoryParameters.PROCESSING_MINIMUM_BYTES * 50,
|
||||
WorkerMemoryParameters.PROCESSING_MINIMUM_BYTES * 50,
|
||||
2,
|
||||
10,
|
||||
2
|
||||
);
|
||||
indexingServiceClient = new MSQTestOverlordServiceClient(
|
||||
queryJsonMapper,
|
||||
injector,
|
||||
new MSQTestTaskActionClient(queryJsonMapper),
|
||||
workerMemoryParameters
|
||||
);
|
||||
return new MSQTaskSqlEngine(indexingServiceClient, queryJsonMapper);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected QueryTestBuilder testBuilder()
|
||||
{
|
||||
return new QueryTestBuilder(new CalciteTestConfig(true))
|
||||
.addCustomVerification(new VerifyMSQSupportedNativeQueriesFactory())
|
||||
.addCustomRunner(new ExtractResultsFactory(() -> (MSQTestOverlordServiceClient) ((MSQTaskSqlEngine) queryFramework().engine()).overlordClient()))
|
||||
.skipVectorize(true)
|
||||
.verifyNativeQueries(false)
|
||||
.msqCompatible(msqCompatible);
|
||||
}
|
||||
|
||||
@Ignore
|
||||
@Override
|
||||
public void testCannotInsertWithNativeEngine()
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
@Ignore
|
||||
@Override
|
||||
public void testCannotReplaceWithNativeEngine()
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
@Ignore
|
||||
@Override
|
||||
public void testRequireTimeConditionSimpleQueryNegative()
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
@Ignore
|
||||
@Override
|
||||
public void testRequireTimeConditionSubQueryNegative()
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
@Ignore
|
||||
@Override
|
||||
public void testRequireTimeConditionSemiJoinNegative()
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
@Ignore
|
||||
@Override
|
||||
public void testExactCountDistinctWithFilter()
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
@Ignore
|
||||
@Override
|
||||
public void testUnplannableQueries()
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
@Ignore
|
||||
@Override
|
||||
public void testMaxSubqueryRows()
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
@Ignore
|
||||
@Override
|
||||
public void testQueryWithMoreThanMaxNumericInFilter()
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,116 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.msq.test;
|
||||
|
||||
import org.apache.druid.java.util.common.ISE;
|
||||
import org.apache.druid.java.util.common.Pair;
|
||||
import org.apache.druid.msq.indexing.report.MSQTaskReport;
|
||||
import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
|
||||
import org.apache.druid.segment.column.RowSignature;
|
||||
import org.apache.druid.sql.calcite.QueryTestBuilder;
|
||||
import org.apache.druid.sql.calcite.QueryTestRunner;
|
||||
import org.junit.Assert;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
/**
|
||||
* {@link QueryTestRunner.QueryRunStep} that extracts the results from the reports so that they can be used in the
|
||||
* subsequent verification steps
|
||||
*/
|
||||
public class ExtractResultsFactory implements QueryTestRunner.QueryRunStepFactory
|
||||
{
|
||||
private final Supplier<MSQTestOverlordServiceClient> overlordClientSupplier;
|
||||
|
||||
/**
|
||||
* @param overlordClientSupplier Supplier to the overlord client which contains the reports for the test run. A
|
||||
* supplier is required because the step is instantiated when the overlord client might
|
||||
* not be instantiated however when we fetch the results, the overlord client must be
|
||||
* instantiated by the query framework
|
||||
*/
|
||||
public ExtractResultsFactory(Supplier<MSQTestOverlordServiceClient> overlordClientSupplier)
|
||||
{
|
||||
this.overlordClientSupplier = overlordClientSupplier;
|
||||
}
|
||||
|
||||
@Override
|
||||
public QueryTestRunner.QueryRunStep make(QueryTestBuilder builder, QueryTestRunner.BaseExecuteQuery execStep)
|
||||
{
|
||||
return new QueryTestRunner.BaseExecuteQuery(builder)
|
||||
{
|
||||
final List<QueryTestRunner.QueryResults> extractedResults = new ArrayList<>();
|
||||
|
||||
final MSQTestOverlordServiceClient overlordClient = overlordClientSupplier.get();
|
||||
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
for (QueryTestRunner.QueryResults results : execStep.results()) {
|
||||
List<Object[]> queryResults = results.results;
|
||||
if (queryResults == null) {
|
||||
extractedResults.add(results);
|
||||
return;
|
||||
}
|
||||
// For a single run, only a single query results containing a single row must be fetched, since UNION is not
|
||||
// currently supported by MSQ
|
||||
Assert.assertEquals(
|
||||
"Found multiple rows, cannot extract the actual results from the reports",
|
||||
1,
|
||||
queryResults.size()
|
||||
);
|
||||
Object[] row = queryResults.get(0);
|
||||
Assert.assertEquals(
|
||||
"Found multiple taskIds, cannot extract the actual results from the reports",
|
||||
1,
|
||||
row.length
|
||||
);
|
||||
String taskId = row[0].toString();
|
||||
MSQTaskReportPayload payload = (MSQTaskReportPayload) overlordClient.getReportForTask(taskId)
|
||||
.get(MSQTaskReport.REPORT_KEY)
|
||||
.getPayload();
|
||||
if (payload.getStatus().getStatus().isFailure()) {
|
||||
throw new ISE(
|
||||
"Query task [%s] failed due to %s",
|
||||
taskId,
|
||||
payload.getStatus().getErrorReport().toString()
|
||||
);
|
||||
}
|
||||
|
||||
if (!payload.getStatus().getStatus().isComplete()) {
|
||||
throw new ISE("Query task [%s] should have finished", taskId);
|
||||
}
|
||||
Optional<Pair<RowSignature, List<Object[]>>> signatureListPair = MSQTestBase.getSignatureWithRows(payload.getResults());
|
||||
if (!signatureListPair.isPresent()) {
|
||||
throw new ISE("Results report not present in the task's report payload");
|
||||
}
|
||||
extractedResults.add(results.withResults(signatureListPair.get().rhs));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<QueryTestRunner.QueryResults> results()
|
||||
{
|
||||
return extractedResults;
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
|
@ -637,7 +637,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
|
|||
return new IngestTester();
|
||||
}
|
||||
|
||||
private ObjectMapper setupObjectMapper(Injector injector)
|
||||
public static ObjectMapper setupObjectMapper(Injector injector)
|
||||
{
|
||||
ObjectMapper mapper = injector.getInstance(ObjectMapper.class)
|
||||
.registerModules(new SimpleModule(IndexingServiceTuningConfigModule.class.getSimpleName())
|
||||
|
@ -746,7 +746,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
|
|||
);
|
||||
}
|
||||
|
||||
private Optional<Pair<RowSignature, List<Object[]>>> getSignatureWithRows(MSQResultsReport resultsReport)
|
||||
public static Optional<Pair<RowSignature, List<Object[]>>> getSignatureWithRows(MSQResultsReport resultsReport)
|
||||
{
|
||||
if (resultsReport == null) {
|
||||
return Optional.empty();
|
||||
|
@ -1241,5 +1241,4 @@ public class MSQTestBase extends BaseCalciteQueryTest
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -23,7 +23,7 @@ import com.google.common.collect.ImmutableList;
|
|||
import com.google.common.io.ByteStreams;
|
||||
import org.apache.druid.java.util.common.IOE;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
import org.apache.druid.sql.calcite.BaseCalciteQueryTest;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
|
@ -38,9 +38,9 @@ public class MSQTestFileUtils
|
|||
/**
|
||||
* Helper method that copies a resource to a temporary file, then returns it.
|
||||
*/
|
||||
public static File getResourceAsTemporaryFile(Object object, final String resource) throws IOException
|
||||
public static File getResourceAsTemporaryFile(TemporaryFolder temporaryFolder, Object object, final String resource) throws IOException
|
||||
{
|
||||
final File file = BaseCalciteQueryTest.temporaryFolder.newFile();
|
||||
final File file = temporaryFolder.newFile();
|
||||
final InputStream stream = object.getClass().getResourceAsStream(resource);
|
||||
|
||||
if (stream == null) {
|
||||
|
@ -55,9 +55,9 @@ public class MSQTestFileUtils
|
|||
* Helper method that populates a temporary file with {@code numRows} rows and {@code numColumns} columns where the
|
||||
* first column is a string 'timestamp' while the rest are string columns with junk value
|
||||
*/
|
||||
public static File generateTemporaryNdJsonFile(final int numRows, final int numColumns) throws IOException
|
||||
public static File generateTemporaryNdJsonFile(TemporaryFolder temporaryFolder, final int numRows, final int numColumns) throws IOException
|
||||
{
|
||||
final File file = BaseCalciteQueryTest.temporaryFolder.newFile();
|
||||
final File file = temporaryFolder.newFile();
|
||||
for (int currentRow = 0; currentRow < numRows; ++currentRow) {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("{");
|
||||
|
|
|
@ -20,8 +20,10 @@
|
|||
package org.apache.druid.msq.test;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.common.util.concurrent.SettableFuture;
|
||||
import com.google.inject.Injector;
|
||||
import org.apache.druid.client.indexing.NoopOverlordClient;
|
||||
import org.apache.druid.indexing.common.TaskReport;
|
||||
|
@ -32,6 +34,7 @@ import org.apache.druid.msq.exec.ControllerImpl;
|
|||
import org.apache.druid.msq.exec.WorkerMemoryParameters;
|
||||
import org.apache.druid.msq.indexing.MSQControllerTask;
|
||||
import org.apache.druid.msq.indexing.MSQSpec;
|
||||
import org.apache.druid.msq.indexing.report.MSQTaskReport;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.HashMap;
|
||||
|
@ -103,6 +106,18 @@ public class MSQTestOverlordServiceClient extends NoopOverlordClient
|
|||
return Futures.immediateFuture(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListenableFuture<Map<String, Object>> taskReportAsMap(String taskId)
|
||||
{
|
||||
SettableFuture<Map<String, Object>> future = SettableFuture.create();
|
||||
future.set(
|
||||
ImmutableMap.of(
|
||||
MSQTaskReport.REPORT_KEY,
|
||||
getReportForTask(taskId).get(MSQTaskReport.REPORT_KEY)
|
||||
));
|
||||
return future;
|
||||
}
|
||||
|
||||
// hooks to pull stuff out for testing
|
||||
@Nullable
|
||||
Map<String, TaskReport> getReportForTask(String id)
|
||||
|
|
|
@ -0,0 +1,74 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.msq.test;
|
||||
|
||||
import org.apache.druid.query.DataSource;
|
||||
import org.apache.druid.query.Query;
|
||||
import org.apache.druid.query.QueryDataSource;
|
||||
import org.apache.druid.sql.calcite.QueryTestBuilder;
|
||||
import org.apache.druid.sql.calcite.QueryTestRunner;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* Custom {@link QueryTestRunner.QueryVerifyStep} for the MSQ run queries. Since all the query types are not supported
|
||||
* therefore we skip the {@link QueryTestRunner.VerifyNativeQueries} check for some query types
|
||||
*/
|
||||
public class VerifyMSQSupportedNativeQueriesFactory implements QueryTestRunner.QueryVerifyStepFactory
|
||||
{
|
||||
@Override
|
||||
public QueryTestRunner.QueryVerifyStep make(QueryTestRunner.BaseExecuteQuery execStep)
|
||||
{
|
||||
return new QueryTestRunner.QueryVerifyStep()
|
||||
{
|
||||
@Override
|
||||
public void verify()
|
||||
{
|
||||
QueryTestBuilder builder = execStep.builder();
|
||||
final List<Query<?>> expectedQueries = builder.getExpectedQueries();
|
||||
final boolean unsupportedQuery = expectedQueries.stream().anyMatch(this::isUnsupportedQuery);
|
||||
if (unsupportedQuery) {
|
||||
return;
|
||||
}
|
||||
new QueryTestRunner.VerifyNativeQueries(execStep).verify();
|
||||
}
|
||||
|
||||
private boolean isUnsupportedQuery(Query<?> query)
|
||||
{
|
||||
if (!Objects.equals(query.getType(), Query.GROUP_BY) && !Objects.equals(query.getType(), Query.SCAN)) {
|
||||
return true;
|
||||
}
|
||||
DataSource dataSource = query.getDataSource();
|
||||
return isUnsupportedDataSource(dataSource);
|
||||
}
|
||||
|
||||
private boolean isUnsupportedDataSource(DataSource dataSource)
|
||||
{
|
||||
if (dataSource instanceof QueryDataSource) {
|
||||
final Query<?> subQuery = ((QueryDataSource) dataSource).getQuery();
|
||||
return isUnsupportedQuery(subQuery);
|
||||
} else {
|
||||
return dataSource.getChildren().stream().anyMatch(this::isUnsupportedDataSource);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
|
@ -20,7 +20,9 @@
|
|||
package org.apache.druid.sql.calcite;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.inject.Injector;
|
||||
|
@ -108,7 +110,6 @@ import org.joda.time.chrono.ISOChronology;
|
|||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Rule;
|
||||
import org.junit.rules.ExpectedException;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
|
@ -260,11 +261,12 @@ public class BaseCalciteQueryTest extends CalciteTestBase
|
|||
@Rule
|
||||
public ExpectedException expectedException = ExpectedException.none();
|
||||
|
||||
@ClassRule
|
||||
public static TemporaryFolder temporaryFolder = new TemporaryFolder();
|
||||
@Rule
|
||||
public TemporaryFolder temporaryFolder = new TemporaryFolder();
|
||||
|
||||
public boolean cannotVectorize = false;
|
||||
public boolean skipVectorize = false;
|
||||
public boolean msqCompatible = false;
|
||||
|
||||
public QueryLogHook queryLogHook;
|
||||
|
||||
|
@ -543,11 +545,12 @@ public class BaseCalciteQueryTest extends CalciteTestBase
|
|||
@Override
|
||||
public SqlEngine createEngine(
|
||||
final QueryLifecycleFactory qlf,
|
||||
final ObjectMapper queryJsonMapper
|
||||
final ObjectMapper queryJsonMapper,
|
||||
Injector injector
|
||||
)
|
||||
{
|
||||
if (engine0 == null) {
|
||||
return baseComponentSupplier.createEngine(qlf, queryJsonMapper);
|
||||
return baseComponentSupplier.createEngine(qlf, queryJsonMapper, injector);
|
||||
} else {
|
||||
return engine0;
|
||||
}
|
||||
|
@ -830,11 +833,25 @@ public class BaseCalciteQueryTest extends CalciteTestBase
|
|||
{
|
||||
return new QueryTestBuilder(new CalciteTestConfig())
|
||||
.cannotVectorize(cannotVectorize)
|
||||
.skipVectorize(skipVectorize);
|
||||
.skipVectorize(skipVectorize)
|
||||
.msqCompatible(msqCompatible);
|
||||
}
|
||||
|
||||
public class CalciteTestConfig implements QueryTestBuilder.QueryTestConfig
|
||||
{
|
||||
private boolean isRunningMSQ = false;
|
||||
|
||||
public CalciteTestConfig()
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
public CalciteTestConfig(boolean isRunningMSQ)
|
||||
{
|
||||
this.isRunningMSQ = isRunningMSQ;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public QueryLogHook queryLogHook()
|
||||
{
|
||||
|
@ -870,6 +887,12 @@ public class BaseCalciteQueryTest extends CalciteTestBase
|
|||
expectedResultSignature
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isRunningMSQ()
|
||||
{
|
||||
return isRunningMSQ;
|
||||
}
|
||||
}
|
||||
|
||||
public Set<ResourceAction> analyzeResources(
|
||||
|
@ -998,6 +1021,11 @@ public class BaseCalciteQueryTest extends CalciteTestBase
|
|||
skipVectorize = true;
|
||||
}
|
||||
|
||||
protected void msqCompatible()
|
||||
{
|
||||
msqCompatible = true;
|
||||
}
|
||||
|
||||
protected static boolean isRewriteJoinToFilter(final Map<String, Object> queryContext)
|
||||
{
|
||||
return (boolean) queryContext.getOrDefault(
|
||||
|
@ -1006,28 +1034,39 @@ public class BaseCalciteQueryTest extends CalciteTestBase
|
|||
);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Override not just the outer query context, but also the contexts of all subqueries.
|
||||
* @return
|
||||
*/
|
||||
public static <T> Query<T> recursivelyOverrideContext(final Query<T> query, final Map<String, Object> context)
|
||||
public static <T> Query recursivelyClearContext(final Query<T> query, ObjectMapper queryJsonMapper)
|
||||
{
|
||||
return query.withDataSource(recursivelyOverrideContext(query.getDataSource(), context))
|
||||
.withOverriddenContext(context);
|
||||
try {
|
||||
Query<T> newQuery = query.withDataSource(recursivelyClearContext(query.getDataSource(), queryJsonMapper));
|
||||
final JsonNode newQueryNode = queryJsonMapper.valueToTree(newQuery);
|
||||
((ObjectNode) newQueryNode).remove("context");
|
||||
return queryJsonMapper.treeToValue(newQueryNode, Query.class);
|
||||
}
|
||||
catch (Exception e) {
|
||||
Assert.fail(e.getMessage());
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Override the contexts of all subqueries of a particular datasource.
|
||||
*/
|
||||
private static DataSource recursivelyOverrideContext(final DataSource dataSource, final Map<String, Object> context)
|
||||
private static DataSource recursivelyClearContext(final DataSource dataSource, ObjectMapper queryJsonMapper)
|
||||
{
|
||||
if (dataSource instanceof QueryDataSource) {
|
||||
final Query<?> subquery = ((QueryDataSource) dataSource).getQuery();
|
||||
return new QueryDataSource(recursivelyOverrideContext(subquery, context));
|
||||
Query<?> newSubQuery = recursivelyClearContext(subquery, queryJsonMapper);
|
||||
return new QueryDataSource(newSubQuery);
|
||||
} else {
|
||||
return dataSource.withChildren(
|
||||
dataSource.getChildren()
|
||||
.stream()
|
||||
.map(ds -> recursivelyOverrideContext(ds, context))
|
||||
.map(ds -> recursivelyClearContext(ds, queryJsonMapper))
|
||||
.collect(Collectors.toList())
|
||||
);
|
||||
}
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -21,6 +21,7 @@ package org.apache.druid.sql.calcite;
|
|||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.inject.Injector;
|
||||
import org.apache.calcite.rel.RelRoot;
|
||||
import org.apache.calcite.rel.type.RelDataType;
|
||||
import org.apache.calcite.rel.type.RelDataTypeFactory;
|
||||
|
@ -83,11 +84,12 @@ public class CalciteScanSignatureTest extends BaseCalciteQueryTest
|
|||
@Override
|
||||
public SqlEngine createEngine(
|
||||
QueryLifecycleFactory qlf,
|
||||
ObjectMapper queryJsonMapper
|
||||
ObjectMapper queryJsonMapper,
|
||||
Injector injector
|
||||
)
|
||||
{
|
||||
// Create an engine that says yes to EngineFeature.SCAN_NEEDS_SIGNATURE.
|
||||
return new ScanSignatureTestSqlEngine(super.createEngine(qlf, queryJsonMapper));
|
||||
return new ScanSignatureTestSqlEngine(super.createEngine(qlf, queryJsonMapper, injector));
|
||||
}
|
||||
|
||||
private static class ScanSignatureTestSqlEngine implements SqlEngine
|
||||
|
|
|
@ -76,6 +76,8 @@ public class QueryTestBuilder
|
|||
|
||||
PlannerFixture plannerFixture(PlannerConfig plannerConfig, AuthConfig authConfig);
|
||||
ResultsVerifier defaultResultsVerifier(List<Object[]> expectedResults, RowSignature expectedResultSignature);
|
||||
|
||||
boolean isRunningMSQ();
|
||||
}
|
||||
|
||||
protected final QueryTestConfig config;
|
||||
|
@ -86,6 +88,7 @@ public class QueryTestBuilder
|
|||
protected AuthenticationResult authenticationResult = CalciteTests.REGULAR_USER_AUTH_RESULT;
|
||||
protected List<Query<?>> expectedQueries;
|
||||
protected List<Object[]> expectedResults;
|
||||
protected List<QueryTestRunner.QueryRunStepFactory> customRunners = new ArrayList<>();
|
||||
protected List<QueryTestRunner.QueryVerifyStepFactory> customVerifications = new ArrayList<>();
|
||||
protected RowSignature expectedResultSignature;
|
||||
protected List<ResourceAction> expectedResources;
|
||||
|
@ -93,7 +96,9 @@ public class QueryTestBuilder
|
|||
@Nullable
|
||||
protected Consumer<ExpectedException> expectedExceptionInitializer;
|
||||
protected boolean skipVectorize;
|
||||
protected boolean msqCompatible;
|
||||
protected boolean queryCannotVectorize;
|
||||
protected boolean verifyNativeQueries = true;
|
||||
protected AuthConfig authConfig = new AuthConfig();
|
||||
protected PlannerFixture plannerFixture;
|
||||
protected String expectedLogicalPlan;
|
||||
|
@ -156,6 +161,23 @@ public class QueryTestBuilder
|
|||
return this;
|
||||
}
|
||||
|
||||
public QueryTestBuilder addCustomRunner(
|
||||
QueryTestRunner.QueryRunStepFactory factory
|
||||
)
|
||||
{
|
||||
this.customRunners.add(factory);
|
||||
return this;
|
||||
}
|
||||
|
||||
public QueryTestBuilder setCustomRunners(
|
||||
List<QueryTestRunner.QueryRunStepFactory> factories
|
||||
)
|
||||
{
|
||||
this.customRunners = new ArrayList<>(factories);
|
||||
return this;
|
||||
}
|
||||
|
||||
|
||||
public QueryTestBuilder addCustomVerification(
|
||||
QueryTestRunner.QueryVerifyStepFactory factory
|
||||
)
|
||||
|
@ -210,6 +232,18 @@ public class QueryTestBuilder
|
|||
return this;
|
||||
}
|
||||
|
||||
public QueryTestBuilder msqCompatible(boolean msqCompatible)
|
||||
{
|
||||
this.msqCompatible = msqCompatible;
|
||||
return this;
|
||||
}
|
||||
|
||||
public QueryTestBuilder verifyNativeQueries(boolean verifyNativeQueries)
|
||||
{
|
||||
this.verifyNativeQueries = verifyNativeQueries;
|
||||
return this;
|
||||
}
|
||||
|
||||
public QueryTestBuilder cannotVectorize()
|
||||
{
|
||||
return cannotVectorize(true);
|
||||
|
@ -252,6 +286,11 @@ public class QueryTestBuilder
|
|||
return this;
|
||||
}
|
||||
|
||||
public List<Query<?>> getExpectedQueries()
|
||||
{
|
||||
return expectedQueries;
|
||||
}
|
||||
|
||||
public QueryTestRunner build()
|
||||
{
|
||||
return new QueryTestRunner(this);
|
||||
|
|
|
@ -47,6 +47,7 @@ import org.apache.druid.sql.calcite.planner.PrepareResult;
|
|||
import org.apache.druid.sql.calcite.table.RowSignatures;
|
||||
import org.apache.druid.sql.calcite.util.QueryLogHook;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Assume;
|
||||
import org.junit.rules.ExpectedException;
|
||||
|
||||
import java.util.ArrayList;
|
||||
|
@ -55,6 +56,7 @@ import java.util.HashMap;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* Runs a test built up by {@link QueryTestBuilder}. Running a SQL query test
|
||||
|
@ -71,7 +73,12 @@ public class QueryTestRunner
|
|||
{
|
||||
public interface QueryVerifyStepFactory
|
||||
{
|
||||
QueryVerifyStep make(ExecuteQuery execStep);
|
||||
QueryVerifyStep make(BaseExecuteQuery execStep);
|
||||
}
|
||||
|
||||
public interface QueryRunStepFactory
|
||||
{
|
||||
QueryRunStep make(QueryTestBuilder builder, QueryTestRunner.BaseExecuteQuery execStep);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -153,6 +160,11 @@ public class QueryTestRunner
|
|||
this.capture = null;
|
||||
this.sqlSignature = null;
|
||||
}
|
||||
|
||||
public QueryResults withResults(List<Object[]> newResults)
|
||||
{
|
||||
return new QueryResults(queryContext, vectorizeOption, sqlSignature, newResults, recordedQueries, capture);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -180,10 +192,10 @@ public class QueryTestRunner
|
|||
{
|
||||
final QueryTestBuilder builder = builder();
|
||||
final SqlQueryPlus sqlQuery = SqlQueryPlus.builder(builder.sql)
|
||||
.context(builder.queryContext)
|
||||
.sqlParameters(builder.parameters)
|
||||
.auth(builder.authenticationResult)
|
||||
.build();
|
||||
.context(builder.queryContext)
|
||||
.sqlParameters(builder.parameters)
|
||||
.auth(builder.authenticationResult)
|
||||
.build();
|
||||
final SqlStatementFactory sqlStatementFactory = builder.statementFactory();
|
||||
final PreparedStatement stmt = sqlStatementFactory.preparedStatement(sqlQuery);
|
||||
final PrepareResult prepareResult = stmt.prepare();
|
||||
|
@ -192,16 +204,12 @@ public class QueryTestRunner
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Runs a query up to three times with different vectorization options.
|
||||
* Captures the results, signature and native queries for each run.
|
||||
*/
|
||||
public static class ExecuteQuery extends QueryRunStep
|
||||
public abstract static class BaseExecuteQuery extends QueryRunStep
|
||||
{
|
||||
private final List<QueryResults> results = new ArrayList<>();
|
||||
private final boolean doCapture;
|
||||
protected final List<QueryResults> results = new ArrayList<>();
|
||||
protected final boolean doCapture;
|
||||
|
||||
public ExecuteQuery(QueryTestBuilder builder)
|
||||
public BaseExecuteQuery(QueryTestBuilder builder)
|
||||
{
|
||||
super(builder);
|
||||
doCapture = builder.expectedLogicalPlan != null;
|
||||
|
@ -211,6 +219,18 @@ public class QueryTestRunner
|
|||
{
|
||||
return results;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Runs a query up to three times with different vectorization options.
|
||||
* Captures the results, signature and native queries for each run.
|
||||
*/
|
||||
public static class ExecuteQuery extends BaseExecuteQuery
|
||||
{
|
||||
public ExecuteQuery(QueryTestBuilder builder)
|
||||
{
|
||||
super(builder);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run()
|
||||
|
@ -221,9 +241,9 @@ public class QueryTestRunner
|
|||
|
||||
final SqlStatementFactory sqlStatementFactory = builder.statementFactory();
|
||||
final SqlQueryPlus sqlQuery = SqlQueryPlus.builder(builder.sql)
|
||||
.sqlParameters(builder.parameters)
|
||||
.auth(builder.authenticationResult)
|
||||
.build();
|
||||
.sqlParameters(builder.parameters)
|
||||
.auth(builder.authenticationResult)
|
||||
.build();
|
||||
|
||||
final List<String> vectorizeValues = new ArrayList<>();
|
||||
vectorizeValues.add("false");
|
||||
|
@ -300,9 +320,9 @@ public class QueryTestRunner
|
|||
*/
|
||||
public static class VerifyResults implements QueryVerifyStep
|
||||
{
|
||||
protected final ExecuteQuery execStep;
|
||||
protected final BaseExecuteQuery execStep;
|
||||
|
||||
public VerifyResults(ExecuteQuery execStep)
|
||||
public VerifyResults(BaseExecuteQuery execStep)
|
||||
{
|
||||
this.execStep = execStep;
|
||||
}
|
||||
|
@ -337,9 +357,9 @@ public class QueryTestRunner
|
|||
*/
|
||||
public static class VerifyNativeQueries implements QueryVerifyStep
|
||||
{
|
||||
protected final ExecuteQuery execStep;
|
||||
protected final BaseExecuteQuery execStep;
|
||||
|
||||
public VerifyNativeQueries(ExecuteQuery execStep)
|
||||
public VerifyNativeQueries(BaseExecuteQuery execStep)
|
||||
{
|
||||
this.execStep = execStep;
|
||||
}
|
||||
|
@ -359,6 +379,7 @@ public class QueryTestRunner
|
|||
}
|
||||
QueryTestBuilder builder = execStep.builder();
|
||||
final List<Query<?>> expectedQueries = new ArrayList<>();
|
||||
ObjectMapper queryJsonMapper = builder.config.jsonMapper();
|
||||
for (Query<?> query : builder.expectedQueries) {
|
||||
// The tests set a lot of various values in the context that are not relevant to how the query actually planned,
|
||||
// so we effectively ignore these keys in the context during query validation by overwriting whatever
|
||||
|
@ -371,16 +392,19 @@ public class QueryTestRunner
|
|||
// we could have validations of query objects that are a bit more intelligent. That is, instead of relying on
|
||||
// equals, perhaps we could have a context validator that only validates that keys set on the expected query
|
||||
// are set, allowing any other context keys to also be set?
|
||||
expectedQueries.add(BaseCalciteQueryTest.recursivelyOverrideContext(query, queryResults.queryContext));
|
||||
expectedQueries.add(BaseCalciteQueryTest.recursivelyClearContext(query, queryJsonMapper));
|
||||
}
|
||||
|
||||
final List<Query<?>> recordedQueries = queryResults.recordedQueries;
|
||||
final List<Query> recordedQueries = queryResults.recordedQueries
|
||||
.stream()
|
||||
.map(q -> BaseCalciteQueryTest.recursivelyClearContext(q, queryJsonMapper))
|
||||
.collect(Collectors.toList());
|
||||
|
||||
Assert.assertEquals(
|
||||
StringUtils.format("query count: %s", builder.sql),
|
||||
expectedQueries.size(),
|
||||
recordedQueries.size()
|
||||
);
|
||||
ObjectMapper queryJsonMapper = builder.config.jsonMapper();
|
||||
for (int i = 0; i < expectedQueries.size(); i++) {
|
||||
Assert.assertEquals(
|
||||
StringUtils.format("query #%d: %s", i + 1, builder.sql),
|
||||
|
@ -451,14 +475,15 @@ public class QueryTestRunner
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Verify resources for a prepared query against the expected list.
|
||||
*/
|
||||
public static class VerifyExecuteSignature implements QueryVerifyStep
|
||||
{
|
||||
private final ExecuteQuery execStep;
|
||||
private final BaseExecuteQuery execStep;
|
||||
|
||||
public VerifyExecuteSignature(ExecuteQuery execStep)
|
||||
public VerifyExecuteSignature(BaseExecuteQuery execStep)
|
||||
{
|
||||
this.execStep = execStep;
|
||||
}
|
||||
|
@ -478,9 +503,9 @@ public class QueryTestRunner
|
|||
|
||||
public static class VerifyLogicalPlan implements QueryVerifyStep
|
||||
{
|
||||
private final ExecuteQuery execStep;
|
||||
private final BaseExecuteQuery execStep;
|
||||
|
||||
public VerifyLogicalPlan(ExecuteQuery execStep)
|
||||
public VerifyLogicalPlan(BaseExecuteQuery execStep)
|
||||
{
|
||||
this.execStep = execStep;
|
||||
}
|
||||
|
@ -542,11 +567,12 @@ public class QueryTestRunner
|
|||
* after the first failure. It would be better to check all three
|
||||
* runs, but that's an exercise for later.
|
||||
*/
|
||||
|
||||
public static class VerifyExpectedException implements QueryVerifyStep
|
||||
{
|
||||
protected final ExecuteQuery execStep;
|
||||
protected final BaseExecuteQuery execStep;
|
||||
|
||||
public VerifyExpectedException(ExecuteQuery execStep)
|
||||
public VerifyExpectedException(BaseExecuteQuery execStep)
|
||||
{
|
||||
this.execStep = execStep;
|
||||
}
|
||||
|
@ -583,6 +609,7 @@ public class QueryTestRunner
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
private final List<QueryTestRunner.QueryRunStep> runSteps = new ArrayList<>();
|
||||
private final List<QueryTestRunner.QueryVerifyStep> verifySteps = new ArrayList<>();
|
||||
|
||||
|
@ -592,6 +619,7 @@ public class QueryTestRunner
|
|||
public QueryTestRunner(QueryTestBuilder builder)
|
||||
{
|
||||
QueryTestConfig config = builder.config;
|
||||
Assume.assumeTrue(!config.isRunningMSQ() || builder.msqCompatible);
|
||||
if (builder.expectedResultsVerifier == null && builder.expectedResults != null) {
|
||||
builder.expectedResultsVerifier = config.defaultResultsVerifier(
|
||||
builder.expectedResults,
|
||||
|
@ -607,43 +635,49 @@ public class QueryTestRunner
|
|||
builder.expectedResultsVerifier == null,
|
||||
"Cannot check both results and resources"
|
||||
);
|
||||
QueryTestRunner.PrepareQuery execStep = new QueryTestRunner.PrepareQuery(builder);
|
||||
PrepareQuery execStep = new PrepareQuery(builder);
|
||||
runSteps.add(execStep);
|
||||
verifySteps.add(new QueryTestRunner.VerifyResources(execStep));
|
||||
verifySteps.add(new VerifyResources(execStep));
|
||||
if (builder.expectedSqlSchema != null) {
|
||||
verifySteps.add(new VerifyPrepareSignature(execStep));
|
||||
}
|
||||
} else {
|
||||
QueryTestRunner.ExecuteQuery execStep = new QueryTestRunner.ExecuteQuery(builder);
|
||||
BaseExecuteQuery execStep = new ExecuteQuery(builder);
|
||||
runSteps.add(execStep);
|
||||
if (!builder.customRunners.isEmpty()) {
|
||||
for (QueryRunStepFactory factory : builder.customRunners) {
|
||||
runSteps.add(factory.make(builder, (BaseExecuteQuery) runSteps.get(runSteps.size() - 1)));
|
||||
}
|
||||
}
|
||||
BaseExecuteQuery finalExecStep = (BaseExecuteQuery) runSteps.get(runSteps.size() - 1);
|
||||
|
||||
// Verify the logical plan, if requested.
|
||||
if (builder.expectedLogicalPlan != null) {
|
||||
verifySteps.add(new QueryTestRunner.VerifyLogicalPlan(execStep));
|
||||
verifySteps.add(new VerifyLogicalPlan(finalExecStep));
|
||||
}
|
||||
|
||||
if (builder.expectedSqlSchema != null) {
|
||||
verifySteps.add(new VerifyExecuteSignature(execStep));
|
||||
verifySteps.add(new VerifyExecuteSignature(finalExecStep));
|
||||
}
|
||||
|
||||
// Verify native queries before results. (Note: change from prior pattern
|
||||
// that reversed the steps.
|
||||
if (builder.expectedQueries != null) {
|
||||
verifySteps.add(new QueryTestRunner.VerifyNativeQueries(execStep));
|
||||
if (builder.verifyNativeQueries && builder.expectedQueries != null) {
|
||||
verifySteps.add(new VerifyNativeQueries(finalExecStep));
|
||||
}
|
||||
if (builder.expectedResultsVerifier != null) {
|
||||
verifySteps.add(new QueryTestRunner.VerifyResults(execStep));
|
||||
verifySteps.add(new VerifyResults(finalExecStep));
|
||||
}
|
||||
|
||||
if (!builder.customVerifications.isEmpty()) {
|
||||
for (QueryTestRunner.QueryVerifyStepFactory customVerification : builder.customVerifications) {
|
||||
verifySteps.add(customVerification.make(execStep));
|
||||
for (QueryVerifyStepFactory customVerification : builder.customVerifications) {
|
||||
verifySteps.add(customVerification.make(finalExecStep));
|
||||
}
|
||||
}
|
||||
|
||||
// The exception is always verified: either there should be no exception
|
||||
// (the other steps ran), or there should be the defined exception.
|
||||
verifySteps.add(new QueryTestRunner.VerifyExpectedException(execStep));
|
||||
verifySteps.add(new VerifyExpectedException(finalExecStep));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -652,10 +686,10 @@ public class QueryTestRunner
|
|||
*/
|
||||
public void run()
|
||||
{
|
||||
for (QueryTestRunner.QueryRunStep runStep : runSteps) {
|
||||
for (QueryRunStep runStep : runSteps) {
|
||||
runStep.run();
|
||||
}
|
||||
for (QueryTestRunner.QueryVerifyStep verifyStep : verifySteps) {
|
||||
for (QueryVerifyStep verifyStep : verifySteps) {
|
||||
verifyStep.verify();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -43,7 +43,7 @@ public class QueryVerification
|
|||
}
|
||||
|
||||
@Override
|
||||
public QueryTestRunner.QueryVerifyStep make(QueryTestRunner.ExecuteQuery execStep)
|
||||
public QueryTestRunner.QueryVerifyStep make(QueryTestRunner.BaseExecuteQuery execStep)
|
||||
{
|
||||
return () -> {
|
||||
for (QueryTestRunner.QueryResults queryResults : execStep.results()) {
|
||||
|
|
|
@ -147,7 +147,8 @@ public class SqlTestFramework
|
|||
|
||||
SqlEngine createEngine(
|
||||
QueryLifecycleFactory qlf,
|
||||
ObjectMapper objectMapper
|
||||
ObjectMapper objectMapper,
|
||||
Injector injector
|
||||
);
|
||||
|
||||
/**
|
||||
|
@ -238,7 +239,11 @@ public class SqlTestFramework
|
|||
}
|
||||
|
||||
@Override
|
||||
public SqlEngine createEngine(QueryLifecycleFactory qlf, ObjectMapper objectMapper)
|
||||
public SqlEngine createEngine(
|
||||
QueryLifecycleFactory qlf,
|
||||
ObjectMapper objectMapper,
|
||||
Injector injector
|
||||
)
|
||||
{
|
||||
return new NativeSqlEngine(
|
||||
qlf,
|
||||
|
@ -543,7 +548,7 @@ public class SqlTestFramework
|
|||
.addModule(new TestSetupModule(builder));
|
||||
builder.componentSupplier.configureGuice(injectorBuilder);
|
||||
this.injector = injectorBuilder.build();
|
||||
this.engine = builder.componentSupplier.createEngine(queryLifecycleFactory(), queryJsonMapper());
|
||||
this.engine = builder.componentSupplier.createEngine(queryLifecycleFactory(), queryJsonMapper(), injector);
|
||||
componentSupplier.configureJsonMapper(queryJsonMapper());
|
||||
componentSupplier.finalizeTestFramework(this);
|
||||
}
|
||||
|
@ -553,6 +558,11 @@ public class SqlTestFramework
|
|||
return injector;
|
||||
}
|
||||
|
||||
public SqlEngine engine()
|
||||
{
|
||||
return engine;
|
||||
}
|
||||
|
||||
public ObjectMapper queryJsonMapper()
|
||||
{
|
||||
return injector.getInstance(ObjectMapper.class);
|
||||
|
|
|
@ -213,7 +213,7 @@ public class TestDataBuilder
|
|||
.withRollup(false)
|
||||
.build();
|
||||
|
||||
private static final IncrementalIndexSchema INDEX_SCHEMA_NUMERIC_DIMS = new IncrementalIndexSchema.Builder()
|
||||
public static final IncrementalIndexSchema INDEX_SCHEMA_NUMERIC_DIMS = new IncrementalIndexSchema.Builder()
|
||||
.withMetrics(
|
||||
new CountAggregatorFactory("cnt"),
|
||||
new FloatSumAggregatorFactory("m1", "m1"),
|
||||
|
|
Loading…
Reference in New Issue