Add framework for running MSQ tests with taskSpec instead of SQL (#16970)

* Add framework for running MSQ tests with taskSpec instead of SQL

* Allow configurable datasegment for tests

* Add test

* Revert "Add test"

This reverts commit 79fb241545.

* Revert "Allow configurable datasegment for tests"

This reverts commit caf04ede2b.
This commit is contained in:
Adarsh Sanjeev 2024-09-09 11:38:28 +05:30 committed by GitHub
parent 37d4174245
commit 616c46c958
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 172 additions and 13 deletions

View File

@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.DruidExceptionMatcher;
@ -34,6 +35,11 @@ import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.msq.indexing.MSQControllerTask;
import org.apache.druid.msq.indexing.MSQSpec;
import org.apache.druid.msq.indexing.MSQTuningConfig;
import org.apache.druid.msq.indexing.destination.DataSourceMSQDestination;
import org.apache.druid.msq.indexing.error.ColumnNameRestrictedFault;
import org.apache.druid.msq.indexing.error.RowTooLargeFault;
import org.apache.druid.msq.indexing.error.TooManySegmentsInTimeChunkFault;
@ -43,11 +49,20 @@ import org.apache.druid.msq.test.CounterSnapshotMatcher;
import org.apache.druid.msq.test.MSQTestBase;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.orderby.DefaultLimitSpec;
import org.apache.druid.query.groupby.orderby.OrderByColumnSpec;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import org.apache.druid.sql.calcite.planner.ColumnMapping;
import org.apache.druid.sql.calcite.planner.ColumnMappings;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.hamcrest.CoreMatchers;
@ -95,6 +110,7 @@ public class MSQInsertTest extends MSQTestBase
};
return Arrays.asList(data);
}
@MethodSource("data")
@ParameterizedTest(name = "{index}:with context {0}")
public void testInsertOnFoo1(String contextName, Map<String, Object> context)
@ -155,6 +171,105 @@ public class MSQInsertTest extends MSQTestBase
}
@MethodSource("data")
@ParameterizedTest(name = "{index}:with context {0}")
public void testInsertOnFoo1WithSpec(String contextName, Map<String, Object> context)
{
List<Object[]> expectedRows = expectedFooRows();
int expectedCounterRows = expectedRows.size();
long[] expectedArray = createExpectedFrameArray(expectedCounterRows, 1);
RowSignature rowSignature = RowSignature.builder()
.add("__time", ColumnType.LONG)
.add("dim1", ColumnType.STRING)
.add("cnt", ColumnType.LONG).build();
MSQSpec msqSpec = new MSQSpec(
GroupByQuery.builder()
.setDataSource("foo")
.setInterval(Intervals.ONLY_ETERNITY)
.setDimFilter(notNull("dim1"))
.setGranularity(Granularities.ALL)
.setDimensions(
new DefaultDimensionSpec("__time", "d0", ColumnType.LONG),
new DefaultDimensionSpec("dim1", "d1", ColumnType.STRING)
)
.setContext(ImmutableMap.<String, Object>builder()
.put("__user", "allowAll")
.put("enableWindowing", true)
.put("finalize", true)
.put("maxNumTasks", 2)
.put("maxParseExceptions", 0)
.put("sqlInsertSegmentGranularity", "\"DAY\"")
.put("sqlQueryId", "test-query")
.put("sqlStringifyArrays", false)
.build()
)
.setLimitSpec(DefaultLimitSpec.builder()
.orderBy(OrderByColumnSpec.asc("d1"))
.build()
)
.setAggregatorSpecs(new CountAggregatorFactory("a0"))
.setQuerySegmentSpec(new MultipleIntervalSegmentSpec(Intervals.ONLY_ETERNITY))
.build(),
new ColumnMappings(
ImmutableList.of(
new ColumnMapping("d0", "__time"),
new ColumnMapping("d1", "dim1"),
new ColumnMapping("a0", "cnt"))
),
new DataSourceMSQDestination(
"foo1",
Granularity.fromString("DAY"),
null,
null,
null,
null
),
WorkerAssignmentStrategy.MAX,
MSQTuningConfig.defaultConfig()
);
ImmutableMap<String, Object> sqlContext =
ImmutableMap.<String, Object>builder()
.putAll(context)
.put("sqlInsertSegmentGranularity", "\"DAY\"")
.put("forceTimeChunkLock", true)
.build();
MSQControllerTask controllerTask = new MSQControllerTask(
TEST_CONTROLLER_TASK_ID,
msqSpec,
null,
sqlContext,
null,
ImmutableList.of(SqlTypeName.TIMESTAMP, SqlTypeName.VARCHAR, SqlTypeName.BIGINT),
ImmutableList.of(ColumnType.LONG, ColumnType.STRING, ColumnType.LONG),
null
);
testIngestQuery().setTaskSpec(controllerTask)
.setExpectedDataSource("foo1")
.setQueryContext(context)
.setExpectedRowSignature(rowSignature)
.setExpectedSegments(expectedFooSegments())
.setExpectedResultRows(expectedRows)
.setExpectedMSQSegmentReport(
new MSQSegmentReport(
NumberedShardSpec.class.getSimpleName(),
"Using NumberedShardSpec to generate segments since the query is inserting rows."
)
)
.setExpectedSegmentGenerationProgressCountersForStageWorker(
CounterSnapshotMatcher
.with().segmentRowsProcessed(Arrays.stream(expectedArray).sum()),
2, 0
)
.setExpectedLookupLoadingSpec(LookupLoadingSpec.ALL)
.verifyResults();
}
@MethodSource("data")
@ParameterizedTest(name = "{index}:with context {0}")
public void testInsertWithExistingTimeColumn(String contextName, Map<String, Object> context) throws IOException

View File

@ -2324,7 +2324,7 @@ public class MSQSelectTest extends MSQTestBase
if (DURABLE_STORAGE.equals(contextName) || FAULT_TOLERANCE.equals(contextName)) {
new File(
localFileStorageDir,
DurableStorageUtils.getWorkerOutputSuccessFilePath("query-test-query", 0, 0)
DurableStorageUtils.getWorkerOutputSuccessFilePath(TEST_CONTROLLER_TASK_ID, 0, 0)
);
Mockito.verify(localFileStorageConnector, Mockito.times(2))

View File

@ -336,6 +336,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
private TestGroupByBuffers groupByBuffers;
protected final WorkerMemoryParameters workerMemoryParameters = Mockito.spy(makeTestWorkerMemoryParameters());
protected static final String TEST_CONTROLLER_TASK_ID = "query-test-query";
protected static class MSQBaseComponentSupplier extends StandardComponentSupplier
{
@ -861,6 +862,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
public abstract class MSQTester<Builder extends MSQTester<Builder>>
{
protected String sql = null;
protected MSQControllerTask taskSpec = null;
protected Map<String, Object> queryContext = DEFAULT_MSQ_CONTEXT;
protected List<MSQResultsReport.ColumnAndType> expectedRowSignature = null;
protected MSQSpec expectedMSQSpec = null;
@ -888,6 +890,12 @@ public class MSQTestBase extends BaseCalciteQueryTest
return asBuilder();
}
public Builder setTaskSpec(MSQControllerTask taskSpec)
{
this.taskSpec = taskSpec;
return asBuilder();
}
public Builder setQueryContext(Map<String, Object> queryContext)
{
this.queryContext = queryContext;
@ -1152,8 +1160,15 @@ public class MSQTestBase extends BaseCalciteQueryTest
public void verifyResults()
{
Preconditions.checkArgument(sql != null, "sql cannot be null");
Preconditions.checkArgument(queryContext != null, "queryContext cannot be null");
Preconditions.checkArgument(
sql != null || taskSpec != null,
"sql and taskSpec both cannot be null"
);
Preconditions.checkArgument(
sql == null || taskSpec == null,
"sql and taskSpec both cannot be provided in the same test"
);
Preconditions.checkArgument(sql == null || queryContext != null, "queryContext cannot be null");
Preconditions.checkArgument(
(expectedResultRows != null && expectedResultRows.isEmpty()) || expectedDataSource != null,
"dataSource cannot be null when expectedResultRows is non-empty"
@ -1169,7 +1184,15 @@ public class MSQTestBase extends BaseCalciteQueryTest
Preconditions.checkArgument(expectedShardSpec != null, "shardSpecClass cannot be null");
readyToRun();
try {
String controllerId = runMultiStageQuery(sql, queryContext);
String controllerId;
if (sql != null) {
// Run the sql command.
controllerId = runMultiStageQuery(sql, queryContext);
} else {
// Run the task spec directly instead.
controllerId = TEST_CONTROLLER_TASK_ID;
indexingServiceClient.runTask(controllerId, taskSpec);
}
if (expectedMSQFault != null || expectedMSQFaultClass != null) {
MSQErrorReport msqErrorReport = getErrorReportOrThrow(controllerId);
if (expectedMSQFault != null) {
@ -1362,18 +1385,32 @@ public class MSQTestBase extends BaseCalciteQueryTest
assertResultsEquals(sql, expectedResultRows, transformedOutputRows);
}
catch (Exception e) {
throw new ISE(e, "Query %s failed", sql);
throw new ISE(e, "Query %s failed", sql != null ? sql : taskSpec);
}
}
public void verifyExecutionError()
{
Preconditions.checkArgument(sql != null, "sql cannot be null");
Preconditions.checkArgument(queryContext != null, "queryContext cannot be null");
Preconditions.checkArgument(
sql != null || taskSpec != null,
"sql and taskSpec both cannot be null"
);
Preconditions.checkArgument(
sql == null || taskSpec == null,
"sql and taskSpec both cannot be provided in the same test"
);
Preconditions.checkArgument(sql == null || queryContext != null, "queryContext cannot be null");
Preconditions.checkArgument(expectedExecutionErrorMatcher != null, "Execution error matcher cannot be null");
readyToRun();
try {
String controllerId = runMultiStageQuery(sql, queryContext);
String controllerId;
if (sql != null) {
controllerId = runMultiStageQuery(sql, queryContext);
} else {
// Run the task spec directly instead.
controllerId = TEST_CONTROLLER_TASK_ID;
indexingServiceClient.runTask(controllerId, taskSpec);
}
getPayloadOrThrow(controllerId);
Assert.fail(StringUtils.format("Query did not throw an exception (sql = [%s])", sql));
}
@ -1399,8 +1436,15 @@ public class MSQTestBase extends BaseCalciteQueryTest
public Pair<MSQSpec, Pair<List<MSQResultsReport.ColumnAndType>, List<Object[]>>> runQueryWithResult()
{
readyToRun();
Preconditions.checkArgument(sql != null, "sql cannot be null");
Preconditions.checkArgument(queryContext != null, "queryContext cannot be null");
Preconditions.checkArgument(
sql != null || taskSpec != null,
"sql and taskSpec both cannot be null"
);
Preconditions.checkArgument(
sql == null || taskSpec == null,
"sql and taskSpec both cannot be provided in the same test"
);
Preconditions.checkArgument(sql == null || queryContext != null, "queryContext cannot be null");
try {
String controllerId = runMultiStageQuery(sql, queryContext);
@ -1496,7 +1540,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
}
catch (Exception e) {
if (expectedExecutionErrorMatcher == null) {
throw new ISE(e, "Query %s failed", sql);
throw new ISE(e, "Query %s failed", sql != null ? sql : taskSpec);
}
assertThat(e, expectedExecutionErrorMatcher);
return null;
@ -1517,7 +1561,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
}
Assert.assertEquals(expectedRowSignature, specAndResults.rhs.lhs);
assertResultsEquals(sql, expectedResultRows, specAndResults.rhs.rhs);
assertResultsEquals(sql != null ? sql : taskSpec.toString(), expectedResultRows, specAndResults.rhs.rhs);
assertMSQSpec(expectedMSQSpec, specAndResults.lhs);
}
@ -1525,7 +1569,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
{
Preconditions.checkArgument(expectedExecutionErrorMatcher != null, "Execution error matcher cannot be null");
if (runQueryWithResult() != null) {
throw new ISE("Query %s did not throw an exception", sql);
throw new ISE("Query %s did not throw an exception", sql != null ? sql : taskSpec);
}
}
}