Add MSQ engine support for window function drill tests (#16665)

* Add MSQ engine support for window function drill tests

* Address review comments

* Revert formatting changes in TestDataBuilder
This commit is contained in:
Akshat Jain 2024-06-28 11:14:17 +05:30 committed by GitHub
parent c96e783750
commit 34c80ee3de
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 308 additions and 159 deletions

View File

@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.msq.exec;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.inject.Injector;
import com.google.inject.Module;
import org.apache.druid.guice.DruidInjectorBuilder;
import org.apache.druid.msq.exec.MSQDrillWindowQueryTest.DrillWindowQueryMSQComponentSupplier;
import org.apache.druid.msq.sql.MSQTaskSqlEngine;
import org.apache.druid.msq.test.CalciteMSQTestsHelper;
import org.apache.druid.msq.test.ExtractResultsFactory;
import org.apache.druid.msq.test.MSQTestOverlordServiceClient;
import org.apache.druid.msq.test.MSQTestTaskActionClient;
import org.apache.druid.msq.test.VerifyMSQSupportedNativeQueriesPredicate;
import org.apache.druid.query.groupby.TestGroupByBuffers;
import org.apache.druid.server.QueryLifecycleFactory;
import org.apache.druid.sql.calcite.DrillWindowQueryTest;
import org.apache.druid.sql.calcite.QueryTestBuilder;
import org.apache.druid.sql.calcite.SqlTestFrameworkConfig;
import org.apache.druid.sql.calcite.TempDirProducer;
import org.apache.druid.sql.calcite.run.SqlEngine;
@SqlTestFrameworkConfig.ComponentSupplier(DrillWindowQueryMSQComponentSupplier.class)
public class MSQDrillWindowQueryTest extends DrillWindowQueryTest
{
public static class DrillWindowQueryMSQComponentSupplier extends DrillComponentSupplier
{
public DrillWindowQueryMSQComponentSupplier(TempDirProducer tempFolderProducer)
{
super(tempFolderProducer);
}
@Override
public void configureGuice(DruidInjectorBuilder builder)
{
super.configureGuice(builder);
builder.addModules(CalciteMSQTestsHelper.fetchModules(tempDirProducer::newTempFolder, TestGroupByBuffers.createDefault()).toArray(new Module[0]));
}
@Override
public SqlEngine createEngine(
QueryLifecycleFactory qlf,
ObjectMapper queryJsonMapper,
Injector injector
)
{
final WorkerMemoryParameters workerMemoryParameters =
WorkerMemoryParameters.createInstance(
WorkerMemoryParameters.PROCESSING_MINIMUM_BYTES * 50,
2,
10,
2,
0,
0
);
final MSQTestOverlordServiceClient indexingServiceClient = new MSQTestOverlordServiceClient(
queryJsonMapper,
injector,
new MSQTestTaskActionClient(queryJsonMapper, injector),
workerMemoryParameters,
ImmutableList.of()
);
return new MSQTaskSqlEngine(indexingServiceClient, queryJsonMapper);
}
}
@Override
protected QueryTestBuilder testBuilder()
{
return new QueryTestBuilder(new CalciteTestConfig(true))
.addCustomRunner(new ExtractResultsFactory(() -> (MSQTestOverlordServiceClient) ((MSQTaskSqlEngine) queryFramework().engine()).overlordClient()))
.skipVectorize(true)
.verifyNativeQueries(new VerifyMSQSupportedNativeQueriesPredicate());
}
}

View File

@ -391,6 +391,13 @@ public class CalciteMSQTestsHelper
case CalciteTests.WIKIPEDIA_FIRST_LAST: case CalciteTests.WIKIPEDIA_FIRST_LAST:
index = TestDataBuilder.makeWikipediaIndexWithAggregation(tempFolderProducer.apply("tmpDir")); index = TestDataBuilder.makeWikipediaIndexWithAggregation(tempFolderProducer.apply("tmpDir"));
break; break;
case CalciteTests.TBL_WITH_NULLS_PARQUET:
case CalciteTests.SML_TBL_PARQUET:
case CalciteTests.ALL_TYPES_UNIQ_PARQUET:
case CalciteTests.FEW_ROWS_ALL_DATA_PARQUET:
case CalciteTests.T_ALL_TYPE_PARQUET:
index = TestDataBuilder.getQueryableIndexForDrillDatasource(segmentId.getDataSource(), tempFolderProducer.apply("tmpDir"));
break;
default: default:
throw new ISE("Cannot query segment %s in test runner", segmentId); throw new ISE("Cannot query segment %s in test runner", segmentId);

View File

@ -6870,6 +6870,7 @@ public class CalciteNestedDataQueryTest extends BaseCalciteQueryTest
@Test @Test
public void testJsonQueryArrays() public void testJsonQueryArrays()
{ {
msqIncompatible();
testBuilder() testBuilder()
.sql("SELECT JSON_QUERY_ARRAY(arrayObject, '$') FROM druid.arrays") .sql("SELECT JSON_QUERY_ARRAY(arrayObject, '$') FROM druid.arrays")
.queryContext(QUERY_CONTEXT_DEFAULT) .queryContext(QUERY_CONTEXT_DEFAULT)

View File

@ -19,38 +19,22 @@
package org.apache.druid.sql.calcite; package org.apache.druid.sql.calcite;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterators;
import com.google.common.io.ByteStreams; import com.google.common.io.ByteStreams;
import com.google.inject.Injector; import com.google.inject.Injector;
import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql2rel.SqlToRelConverter; import org.apache.calcite.sql2rel.SqlToRelConverter;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.druid.common.config.NullHandling; import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.DoubleDimensionSchema;
import org.apache.druid.data.input.impl.LongDimensionSchema;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Numbers; import org.apache.druid.java.util.common.Numbers;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.TimestampParser; import org.apache.druid.java.util.common.parsers.TimestampParser;
import org.apache.druid.query.QueryContexts; import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryRunnerFactoryConglomerate; import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.segment.IndexBuilder;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.join.JoinableFactoryWrapper; import org.apache.druid.segment.join.JoinableFactoryWrapper;
import org.apache.druid.segment.writeout.OnHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.server.SpecificSegmentsQuerySegmentWalker; import org.apache.druid.server.SpecificSegmentsQuerySegmentWalker;
import org.apache.druid.sql.calcite.DisableUnless.DisableUnlessRule; import org.apache.druid.sql.calcite.DisableUnless.DisableUnlessRule;
import org.apache.druid.sql.calcite.DrillWindowQueryTest.DrillComponentSupplier; import org.apache.druid.sql.calcite.DrillWindowQueryTest.DrillComponentSupplier;
@ -60,8 +44,7 @@ import org.apache.druid.sql.calcite.QueryTestRunner.QueryResults;
import org.apache.druid.sql.calcite.planner.PlannerCaptureHook; import org.apache.druid.sql.calcite.planner.PlannerCaptureHook;
import org.apache.druid.sql.calcite.planner.PlannerContext; import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.util.SqlTestFramework.StandardComponentSupplier; import org.apache.druid.sql.calcite.util.SqlTestFramework.StandardComponentSupplier;
import org.apache.druid.timeline.DataSegment; import org.apache.druid.sql.calcite.util.TestDataBuilder;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.LocalTime; import org.joda.time.LocalTime;
import org.junit.Assert; import org.junit.Assert;
@ -88,7 +71,6 @@ import java.util.Arrays;
import java.util.Comparator; import java.util.Comparator;
import java.util.List; import java.util.List;
import java.util.Locale; import java.util.Locale;
import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -113,8 +95,6 @@ import static org.junit.Assert.fail;
@SqlTestFrameworkConfig.ComponentSupplier(DrillComponentSupplier.class) @SqlTestFrameworkConfig.ComponentSupplier(DrillComponentSupplier.class)
public class DrillWindowQueryTest extends BaseCalciteQueryTest public class DrillWindowQueryTest extends BaseCalciteQueryTest
{ {
private static final ObjectMapper MAPPER = new DefaultObjectMapper();
static { static {
NullHandling.initializeForTests(); NullHandling.initializeForTests();
} }
@ -259,135 +239,9 @@ public class DrillWindowQueryTest extends BaseCalciteQueryTest
joinableFactory, joinableFactory,
injector); injector);
attachIndex(
retVal,
"tblWnulls.parquet",
new LongDimensionSchema("c1"),
new StringDimensionSchema("c2"));
// {"col0":1,"col1":65534,"col2":256.0,"col3":1234.9,"col4":73578580,"col5":1393720082338,"col6":421185052800000,"col7":false,"col8":"CA","col9":"AXXXXXXXXXXXXXXXXXXXXXXXXXCXXXXXXXXXXXXXXXXXXXXXXXXZ"}
attachIndex(
retVal,
"allTypsUniq.parquet",
new LongDimensionSchema("col0"),
new LongDimensionSchema("col1"),
new DoubleDimensionSchema("col2"),
new DoubleDimensionSchema("col3"),
new LongDimensionSchema("col4"),
new LongDimensionSchema("col5"),
new LongDimensionSchema("col6"),
new StringDimensionSchema("col7"),
new StringDimensionSchema("col8"),
new StringDimensionSchema("col9"));
attachIndex(
retVal,
"smlTbl.parquet",
// "col_int": 8122,
new LongDimensionSchema("col_int"),
// "col_bgint": 817200,
new LongDimensionSchema("col_bgint"),
// "col_char_2": "IN",
new StringDimensionSchema("col_char_2"),
// "col_vchar_52":
// "AXXXXXXXXXXXXXXXXXXXXXXXXXCXXXXXXXXXXXXXXXXXXXXXXXXB",
new StringDimensionSchema("col_vchar_52"),
// "col_tmstmp": 1409617682418,
new LongDimensionSchema("col_tmstmp"),
// "col_dt": 422717616000000,
new LongDimensionSchema("col_dt"),
// "col_booln": false,
new StringDimensionSchema("col_booln"),
// "col_dbl": 12900.48,
new DoubleDimensionSchema("col_dbl"),
// "col_tm": 33109170
new LongDimensionSchema("col_tm"));
attachIndex(
retVal,
"fewRowsAllData.parquet",
// "col0":12024,
new LongDimensionSchema("col0"),
// "col1":307168,
new LongDimensionSchema("col1"),
// "col2":"VT",
new StringDimensionSchema("col2"),
// "col3":"DXXXXXXXXXXXXXXXXXXXXXXXXXEXXXXXXXXXXXXXXXXXXXXXXXXF",
new StringDimensionSchema("col3"),
// "col4":1338596882419,
new LongDimensionSchema("col4"),
// "col5":422705433600000,
new LongDimensionSchema("col5"),
// "col6":true,
new StringDimensionSchema("col6"),
// "col7":3.95110006277E8,
new DoubleDimensionSchema("col7"),
// "col8":67465430
new LongDimensionSchema("col8"));
attachIndex(
retVal,
"t_alltype.parquet",
// "c1":1,
new LongDimensionSchema("c1"),
// "c2":592475043,
new LongDimensionSchema("c2"),
// "c3":616080519999272,
new LongDimensionSchema("c3"),
// "c4":"ObHeWTDEcbGzssDwPwurfs",
new StringDimensionSchema("c4"),
// "c5":"0sZxIfZ CGwTOaLWZ6nWkUNx",
new StringDimensionSchema("c5"),
// "c6":1456290852307,
new LongDimensionSchema("c6"),
// "c7":421426627200000,
new LongDimensionSchema("c7"),
// "c8":true,
new StringDimensionSchema("c8"),
// "c9":0.626179100469
new DoubleDimensionSchema("c9"));
return retVal;
}
@SuppressWarnings({"rawtypes", "unchecked"})
private void attachIndex(SpecificSegmentsQuerySegmentWalker texasRanger, String dataSource, DimensionSchema... dims)
{
ArrayList<String> dimensionNames = new ArrayList<>(dims.length);
for (DimensionSchema dimension : dims) {
dimensionNames.add(dimension.getName());
}
final File tmpFolder = tempDirProducer.newTempFolder(); final File tmpFolder = tempDirProducer.newTempFolder();
final QueryableIndex queryableIndex = IndexBuilder TestDataBuilder.attachIndexesForDrillTestDatasources(retVal, tmpFolder);
.create() return retVal;
.tmpDir(new File(tmpFolder, dataSource))
.segmentWriteOutMediumFactory(OnHeapMemorySegmentWriteOutMediumFactory.instance())
.schema(new IncrementalIndexSchema.Builder()
.withRollup(false)
.withDimensionsSpec(new DimensionsSpec(Arrays.asList(dims)))
.build())
.rows(
() -> {
try {
return Iterators.transform(
MAPPER.readerFor(Map.class)
.readValues(
ClassLoader.getSystemResource("drill/window/datasources/" + dataSource + ".json")),
(Function<Map, InputRow>) input -> new MapBasedInputRow(0, dimensionNames, input));
}
catch (IOException e) {
throw new RE(e, "problem reading file");
}
})
.buildMMappedIndex();
texasRanger.add(
DataSegment.builder()
.dataSource(dataSource)
.interval(Intervals.ETERNITY)
.version("1")
.shardSpec(new NumberedShardSpec(0, 0))
.size(0)
.build(),
queryableIndex);
} }
} }
@ -421,7 +275,7 @@ public class DrillWindowQueryTest extends BaseCalciteQueryTest
try { try {
Assert.assertEquals(StringUtils.format("result count: %s", sql), expectedResultsText.size(), results.size()); Assert.assertEquals(StringUtils.format("result count: %s", sql), expectedResultsText.size(), results.size());
if (!isOrdered(queryResults)) { if (!isOrdered(queryResults)) {
// in case the resultset is not ordered; order via the same comparator before comparision // in case the resultset is not ordered; order via the same comparator before comparison
results.sort(new ArrayRowCmp()); results.sort(new ArrayRowCmp());
expectedResults.sort(new ArrayRowCmp()); expectedResults.sort(new ArrayRowCmp());
} }
@ -4410,6 +4264,7 @@ public class DrillWindowQueryTest extends BaseCalciteQueryTest
@Test @Test
public void test_aggregates_winFnQry_83() public void test_aggregates_winFnQry_83()
{ {
msqIncompatible();
windowQueryTest(); windowQueryTest();
} }
@ -6302,6 +6157,7 @@ public class DrillWindowQueryTest extends BaseCalciteQueryTest
@Test @Test
public void test_aggregates_winFnQry_84() public void test_aggregates_winFnQry_84()
{ {
msqIncompatible();
windowQueryTest(); windowQueryTest();
} }
@ -6309,6 +6165,7 @@ public class DrillWindowQueryTest extends BaseCalciteQueryTest
@Test @Test
public void test_aggregates_winFnQry_85() public void test_aggregates_winFnQry_85()
{ {
msqIncompatible();
windowQueryTest(); windowQueryTest();
} }
@ -6694,6 +6551,7 @@ public class DrillWindowQueryTest extends BaseCalciteQueryTest
@Test @Test
public void test_frameclause_multipl_wnwds_avg_mulwds() public void test_frameclause_multipl_wnwds_avg_mulwds()
{ {
msqIncompatible();
windowQueryTest(); windowQueryTest();
} }
@ -6701,6 +6559,7 @@ public class DrillWindowQueryTest extends BaseCalciteQueryTest
@Test @Test
public void test_frameclause_multipl_wnwds_count_mulwds() public void test_frameclause_multipl_wnwds_count_mulwds()
{ {
msqIncompatible();
windowQueryTest(); windowQueryTest();
} }
@ -6708,6 +6567,7 @@ public class DrillWindowQueryTest extends BaseCalciteQueryTest
@Test @Test
public void test_frameclause_multipl_wnwds_fval_mulwds() public void test_frameclause_multipl_wnwds_fval_mulwds()
{ {
msqIncompatible();
windowQueryTest(); windowQueryTest();
} }
@ -6715,6 +6575,7 @@ public class DrillWindowQueryTest extends BaseCalciteQueryTest
@Test @Test
public void test_frameclause_multipl_wnwds_lval_mulwds() public void test_frameclause_multipl_wnwds_lval_mulwds()
{ {
msqIncompatible();
windowQueryTest(); windowQueryTest();
} }
@ -6722,6 +6583,7 @@ public class DrillWindowQueryTest extends BaseCalciteQueryTest
@Test @Test
public void test_frameclause_multipl_wnwds_mulwind_08() public void test_frameclause_multipl_wnwds_mulwind_08()
{ {
msqIncompatible();
windowQueryTest(); windowQueryTest();
} }
@ -6729,6 +6591,7 @@ public class DrillWindowQueryTest extends BaseCalciteQueryTest
@Test @Test
public void test_frameclause_multipl_wnwds_mulwind_09() public void test_frameclause_multipl_wnwds_mulwind_09()
{ {
msqIncompatible();
windowQueryTest(); windowQueryTest();
} }
@ -6736,6 +6599,7 @@ public class DrillWindowQueryTest extends BaseCalciteQueryTest
@Test @Test
public void test_frameclause_multipl_wnwds_sum_mulwds() public void test_frameclause_multipl_wnwds_sum_mulwds()
{ {
msqIncompatible();
windowQueryTest(); windowQueryTest();
} }
@ -7508,6 +7372,7 @@ public class DrillWindowQueryTest extends BaseCalciteQueryTest
@Test @Test
public void test_frameclause_multipl_wnwds_rnkNoFrm01() public void test_frameclause_multipl_wnwds_rnkNoFrm01()
{ {
msqIncompatible();
windowQueryTest(); windowQueryTest();
} }
@ -7515,6 +7380,7 @@ public class DrillWindowQueryTest extends BaseCalciteQueryTest
@Test @Test
public void test_frameclause_multipl_wnwds_rnkNoFrm02() public void test_frameclause_multipl_wnwds_rnkNoFrm02()
{ {
msqIncompatible();
windowQueryTest(); windowQueryTest();
} }
@ -7522,6 +7388,7 @@ public class DrillWindowQueryTest extends BaseCalciteQueryTest
@Test @Test
public void test_frameclause_multipl_wnwds_rnkNoFrm03() public void test_frameclause_multipl_wnwds_rnkNoFrm03()
{ {
msqIncompatible();
windowQueryTest(); windowQueryTest();
} }
@ -7529,6 +7396,7 @@ public class DrillWindowQueryTest extends BaseCalciteQueryTest
@Test @Test
public void test_frameclause_multipl_wnwds_rnkNoFrm04() public void test_frameclause_multipl_wnwds_rnkNoFrm04()
{ {
msqIncompatible();
windowQueryTest(); windowQueryTest();
} }
@ -7536,6 +7404,7 @@ public class DrillWindowQueryTest extends BaseCalciteQueryTest
@Test @Test
public void test_frameclause_multipl_wnwds_rnkNoFrm05() public void test_frameclause_multipl_wnwds_rnkNoFrm05()
{ {
msqIncompatible();
windowQueryTest(); windowQueryTest();
} }
@ -7543,6 +7412,7 @@ public class DrillWindowQueryTest extends BaseCalciteQueryTest
@Test @Test
public void test_frameclause_multipl_wnwds_rnkNoFrm06() public void test_frameclause_multipl_wnwds_rnkNoFrm06()
{ {
msqIncompatible();
windowQueryTest(); windowQueryTest();
} }
@ -7655,6 +7525,7 @@ public class DrillWindowQueryTest extends BaseCalciteQueryTest
@Test @Test
public void test_nestedAggs_multiWin_6() public void test_nestedAggs_multiWin_6()
{ {
msqIncompatible();
windowQueryTest(); windowQueryTest();
} }

View File

@ -171,7 +171,7 @@ public @interface NotYetSupported
Matcher m = annotation.value().getPattern().matcher(trace); Matcher m = annotation.value().getPattern().matcher(trace);
if (!m.find()) { if (!m.find()) {
throw new AssertionError("Exception stactrace doesn't match regex: " + annotation.value().regex, e); throw new AssertionError("Exception stacktrace doesn't match regex: " + annotation.value().regex, e);
} }
throw new AssumptionViolatedException("Test is not-yet supported; ignored with:" + annotation); throw new AssumptionViolatedException("Test is not-yet supported; ignored with:" + annotation);
} }

View File

@ -356,15 +356,12 @@ public class QueryTestRunner
public static class VerifyResults implements QueryVerifyStep public static class VerifyResults implements QueryVerifyStep
{ {
protected final BaseExecuteQuery execStep; protected final BaseExecuteQuery execStep;
protected final boolean verifyRowSignature;
public VerifyResults( public VerifyResults(
BaseExecuteQuery execStep, BaseExecuteQuery execStep
boolean verifyRowSignature
) )
{ {
this.execStep = execStep; this.execStep = execStep;
this.verifyRowSignature = verifyRowSignature;
} }
@Override @Override
@ -386,9 +383,7 @@ public class QueryTestRunner
} }
QueryTestBuilder builder = execStep.builder(); QueryTestBuilder builder = execStep.builder();
if (verifyRowSignature) { builder.expectedResultsVerifier.verifyRowSignature(queryResults.signature);
builder.expectedResultsVerifier.verifyRowSignature(queryResults.signature);
}
builder.expectedResultsVerifier.verify(builder.sql, queryResults); builder.expectedResultsVerifier.verify(builder.sql, queryResults);
} }
} }
@ -747,7 +742,7 @@ public class QueryTestRunner
if (builder.expectedResultsVerifier != null) { if (builder.expectedResultsVerifier != null) {
// Don't verify the row signature when MSQ is running, since the broker receives the task id, and the signature // Don't verify the row signature when MSQ is running, since the broker receives the task id, and the signature
// would be {TASK:STRING} instead of the expected results signature // would be {TASK:STRING} instead of the expected results signature
verifySteps.add(new VerifyResults(finalExecStep, !config.isRunningMSQ())); verifySteps.add(new VerifyResults(finalExecStep));
} }
if (!builder.customVerifications.isEmpty()) { if (!builder.customVerifications.isEmpty()) {

View File

@ -125,6 +125,11 @@ public class CalciteTests
public static final String DRUID_SCHEMA_NAME = "druid"; public static final String DRUID_SCHEMA_NAME = "druid";
public static final String WIKIPEDIA = "wikipedia"; public static final String WIKIPEDIA = "wikipedia";
public static final String WIKIPEDIA_FIRST_LAST = "wikipedia_first_last"; public static final String WIKIPEDIA_FIRST_LAST = "wikipedia_first_last";
public static final String TBL_WITH_NULLS_PARQUET = "tblWnulls.parquet";
public static final String SML_TBL_PARQUET = "smlTbl.parquet";
public static final String ALL_TYPES_UNIQ_PARQUET = "allTypsUniq.parquet";
public static final String FEW_ROWS_ALL_DATA_PARQUET = "fewRowsAllData.parquet";
public static final String T_ALL_TYPE_PARQUET = "t_alltype.parquet";
public static final String TEST_SUPERUSER_NAME = "testSuperuser"; public static final String TEST_SUPERUSER_NAME = "testSuperuser";
public static final AuthorizerMapper TEST_AUTHORIZER_MAPPER = new AuthorizerMapper(null) public static final AuthorizerMapper TEST_AUTHORIZER_MAPPER = new AuthorizerMapper(null)

View File

@ -19,9 +19,12 @@
package org.apache.druid.sql.calcite.util; package org.apache.druid.sql.calcite.util;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterators;
import com.google.inject.Injector; import com.google.inject.Injector;
import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema; import org.apache.druid.data.input.InputRowSchema;
@ -36,8 +39,10 @@ import org.apache.druid.data.input.impl.LongDimensionSchema;
import org.apache.druid.data.input.impl.MapInputRowParser; import org.apache.druid.data.input.impl.MapInputRowParser;
import org.apache.druid.data.input.impl.StringDimensionSchema; import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.JSONPathSpec; import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.query.DataSource; import org.apache.druid.query.DataSource;
@ -102,6 +107,8 @@ import java.util.stream.Collectors;
*/ */
public class TestDataBuilder public class TestDataBuilder
{ {
private static final ObjectMapper MAPPER = new DefaultObjectMapper();
public static final String TIMESTAMP_COLUMN = "t"; public static final String TIMESTAMP_COLUMN = "t";
public static final GlobalTableDataSource CUSTOM_TABLE = new GlobalTableDataSource(CalciteTests.BROADCAST_DATASOURCE); public static final GlobalTableDataSource CUSTOM_TABLE = new GlobalTableDataSource(CalciteTests.BROADCAST_DATASOURCE);
@ -962,6 +969,175 @@ public class TestDataBuilder
); );
} }
public static void attachIndexesForDrillTestDatasources(SpecificSegmentsQuerySegmentWalker segmentWalker, File tmpDir)
{
attachIndexForDrillTestDatasource(segmentWalker, CalciteTests.TBL_WITH_NULLS_PARQUET, tmpDir);
attachIndexForDrillTestDatasource(segmentWalker, CalciteTests.SML_TBL_PARQUET, tmpDir);
attachIndexForDrillTestDatasource(segmentWalker, CalciteTests.ALL_TYPES_UNIQ_PARQUET, tmpDir);
attachIndexForDrillTestDatasource(segmentWalker, CalciteTests.FEW_ROWS_ALL_DATA_PARQUET, tmpDir);
attachIndexForDrillTestDatasource(segmentWalker, CalciteTests.T_ALL_TYPE_PARQUET, tmpDir);
}
@SuppressWarnings({"rawtypes", "unchecked"})
private static void attachIndexForDrillTestDatasource(
SpecificSegmentsQuerySegmentWalker segmentWalker,
String dataSource,
File tmpDir
)
{
final QueryableIndex queryableIndex = getQueryableIndexForDrillDatasource(dataSource, tmpDir);
segmentWalker.add(
DataSegment.builder()
.dataSource(dataSource)
.interval(Intervals.ETERNITY)
.version("1")
.shardSpec(new NumberedShardSpec(0, 0))
.size(0)
.build(),
queryableIndex);
}
public static QueryableIndex getQueryableIndexForDrillDatasource(String datasource, File parentTempDir)
{
final IncrementalIndexSchema indexSchema = new IncrementalIndexSchema.Builder()
.withDimensionsSpec(getDimensionSpecForDrillDatasource(datasource))
.withRollup(false)
.build();
Iterable<InputRow> inputRowsForDrillDatasource = getInputRowsForDrillDatasource(datasource);
return IndexBuilder
.create()
.tmpDir(new File(parentTempDir, datasource))
.segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
.schema(indexSchema)
.rows(inputRowsForDrillDatasource)
.buildMMappedIndex();
}
private static DimensionsSpec getDimensionSpecForDrillDatasource(String datasource)
{
switch (datasource) {
case CalciteTests.TBL_WITH_NULLS_PARQUET: {
return new DimensionsSpec(
ImmutableList.of(
new LongDimensionSchema("c1"),
new StringDimensionSchema("c2")
)
);
}
case CalciteTests.SML_TBL_PARQUET: {
return new DimensionsSpec(
ImmutableList.of(
// "col_int": 8122,
new LongDimensionSchema("col_int"),
// "col_bgint": 817200,
new LongDimensionSchema("col_bgint"),
// "col_char_2": "IN",
new StringDimensionSchema("col_char_2"),
// "col_vchar_52":
// "AXXXXXXXXXXXXXXXXXXXXXXXXXCXXXXXXXXXXXXXXXXXXXXXXXXB",
new StringDimensionSchema("col_vchar_52"),
// "col_tmstmp": 1409617682418,
new LongDimensionSchema("col_tmstmp"),
// "col_dt": 422717616000000,
new LongDimensionSchema("col_dt"),
// "col_booln": false,
new StringDimensionSchema("col_booln"),
// "col_dbl": 12900.48,
new DoubleDimensionSchema("col_dbl"),
// "col_tm": 33109170
new LongDimensionSchema("col_tm")
)
);
}
case CalciteTests.ALL_TYPES_UNIQ_PARQUET: {
// {"col0":1,"col1":65534,"col2":256.0,"col3":1234.9,"col4":73578580,"col5":1393720082338,"col6":421185052800000,"col7":false,"col8":"CA","col9":"AXXXXXXXXXXXXXXXXXXXXXXXXXCXXXXXXXXXXXXXXXXXXXXXXXXZ"}
return new DimensionsSpec(
ImmutableList.of(
new LongDimensionSchema("col0"),
new LongDimensionSchema("col1"),
new DoubleDimensionSchema("col2"),
new DoubleDimensionSchema("col3"),
new LongDimensionSchema("col4"),
new LongDimensionSchema("col5"),
new LongDimensionSchema("col6"),
new StringDimensionSchema("col7"),
new StringDimensionSchema("col8"),
new StringDimensionSchema("col9")
)
);
}
case CalciteTests.FEW_ROWS_ALL_DATA_PARQUET: {
return new DimensionsSpec(
ImmutableList.of(
// "col0":12024,
new LongDimensionSchema("col0"),
// "col1":307168,
new LongDimensionSchema("col1"),
// "col2":"VT",
new StringDimensionSchema("col2"),
// "col3":"DXXXXXXXXXXXXXXXXXXXXXXXXXEXXXXXXXXXXXXXXXXXXXXXXXXF",
new StringDimensionSchema("col3"),
// "col4":1338596882419,
new LongDimensionSchema("col4"),
// "col5":422705433600000,
new LongDimensionSchema("col5"),
// "col6":true,
new StringDimensionSchema("col6"),
// "col7":3.95110006277E8,
new DoubleDimensionSchema("col7"),
// "col8":67465430
new LongDimensionSchema("col8")
)
);
}
case CalciteTests.T_ALL_TYPE_PARQUET: {
return new DimensionsSpec(
ImmutableList.of(
// "c1":1,
new LongDimensionSchema("c1"),
// "c2":592475043,
new LongDimensionSchema("c2"),
// "c3":616080519999272,
new LongDimensionSchema("c3"),
// "c4":"ObHeWTDEcbGzssDwPwurfs",
new StringDimensionSchema("c4"),
// "c5":"0sZxIfZ CGwTOaLWZ6nWkUNx",
new StringDimensionSchema("c5"),
// "c6":1456290852307,
new LongDimensionSchema("c6"),
// "c7":421426627200000,
new LongDimensionSchema("c7"),
// "c8":true,
new StringDimensionSchema("c8"),
// "c9":0.626179100469
new DoubleDimensionSchema("c9")
)
);
}
default:
throw new RuntimeException("Invalid datasource supplied for drill tests");
}
}
private static Iterable<InputRow> getInputRowsForDrillDatasource(String datasource)
{
DimensionsSpec dimensionSpecForDrillDatasource = getDimensionSpecForDrillDatasource(datasource);
return () -> {
try {
return Iterators.transform(
MAPPER.readerFor(Map.class)
.readValues(
ClassLoader.getSystemResource("drill/window/datasources/" + datasource + ".json")),
(Function<Map, InputRow>) input -> new MapBasedInputRow(0, dimensionSpecForDrillDatasource.getDimensionNames(), input)
);
}
catch (IOException e) {
throw new RE(e, "problem reading file");
}
};
}
private static MapBasedInputRow toRow(String time, List<String> dimensions, Map<String, Object> event) private static MapBasedInputRow toRow(String time, List<String> dimensions, Map<String, Object> event)
{ {
return new MapBasedInputRow(DateTimes.ISO_DATE_OPTIONAL_TIME.parse(time), dimensions, event); return new MapBasedInputRow(DateTimes.ISO_DATE_OPTIONAL_TIME.parse(time), dimensions, event);