diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java index 4449090fb69..4d61555c80f 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java @@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.hash.HashFunction; import com.google.common.hash.Hashing; +import org.apache.calcite.sql.type.SqlTypeName; import org.apache.druid.common.config.NullHandling; import org.apache.druid.error.DruidException; import org.apache.druid.error.DruidExceptionMatcher; @@ -34,6 +35,11 @@ import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.granularity.Granularity; +import org.apache.druid.msq.indexing.MSQControllerTask; +import org.apache.druid.msq.indexing.MSQSpec; +import org.apache.druid.msq.indexing.MSQTuningConfig; +import org.apache.druid.msq.indexing.destination.DataSourceMSQDestination; import org.apache.druid.msq.indexing.error.ColumnNameRestrictedFault; import org.apache.druid.msq.indexing.error.RowTooLargeFault; import org.apache.druid.msq.indexing.error.TooManySegmentsInTimeChunkFault; @@ -43,11 +49,20 @@ import org.apache.druid.msq.test.CounterSnapshotMatcher; import org.apache.druid.msq.test.MSQTestBase; import org.apache.druid.msq.util.MultiStageQueryContext; import org.apache.druid.query.QueryContexts; +import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; +import org.apache.druid.query.dimension.DefaultDimensionSpec; +import org.apache.druid.query.groupby.GroupByQuery; +import org.apache.druid.query.groupby.orderby.DefaultLimitSpec; +import org.apache.druid.query.groupby.orderby.OrderByColumnSpec; +import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.column.ValueType; +import org.apache.druid.server.lookup.cache.LookupLoadingSpec; +import org.apache.druid.sql.calcite.planner.ColumnMapping; +import org.apache.druid.sql.calcite.planner.ColumnMappings; import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.partition.NumberedShardSpec; import org.hamcrest.CoreMatchers; @@ -95,6 +110,7 @@ public class MSQInsertTest extends MSQTestBase }; return Arrays.asList(data); } + @MethodSource("data") @ParameterizedTest(name = "{index}:with context {0}") public void testInsertOnFoo1(String contextName, Map context) @@ -155,6 +171,105 @@ public class MSQInsertTest extends MSQTestBase } + @MethodSource("data") + @ParameterizedTest(name = "{index}:with context {0}") + public void testInsertOnFoo1WithSpec(String contextName, Map context) + { + List expectedRows = expectedFooRows(); + int expectedCounterRows = expectedRows.size(); + long[] expectedArray = createExpectedFrameArray(expectedCounterRows, 1); + + RowSignature rowSignature = RowSignature.builder() + .add("__time", ColumnType.LONG) + .add("dim1", ColumnType.STRING) + .add("cnt", ColumnType.LONG).build(); + + MSQSpec msqSpec = new MSQSpec( + GroupByQuery.builder() + .setDataSource("foo") + .setInterval(Intervals.ONLY_ETERNITY) + .setDimFilter(notNull("dim1")) + .setGranularity(Granularities.ALL) + .setDimensions( + new DefaultDimensionSpec("__time", "d0", ColumnType.LONG), + new DefaultDimensionSpec("dim1", "d1", ColumnType.STRING) + ) + .setContext(ImmutableMap.builder() + .put("__user", "allowAll") + .put("enableWindowing", true) + .put("finalize", true) + .put("maxNumTasks", 2) + .put("maxParseExceptions", 0) + .put("sqlInsertSegmentGranularity", "\"DAY\"") + .put("sqlQueryId", "test-query") + .put("sqlStringifyArrays", false) + .build() + ) + .setLimitSpec(DefaultLimitSpec.builder() + .orderBy(OrderByColumnSpec.asc("d1")) + .build() + ) + .setAggregatorSpecs(new CountAggregatorFactory("a0")) + .setQuerySegmentSpec(new MultipleIntervalSegmentSpec(Intervals.ONLY_ETERNITY)) + .build(), + new ColumnMappings( + ImmutableList.of( + new ColumnMapping("d0", "__time"), + new ColumnMapping("d1", "dim1"), + new ColumnMapping("a0", "cnt")) + ), + new DataSourceMSQDestination( + "foo1", + Granularity.fromString("DAY"), + null, + null, + null, + null + ), + WorkerAssignmentStrategy.MAX, + MSQTuningConfig.defaultConfig() + ); + + ImmutableMap sqlContext = + ImmutableMap.builder() + .putAll(context) + .put("sqlInsertSegmentGranularity", "\"DAY\"") + .put("forceTimeChunkLock", true) + .build(); + + MSQControllerTask controllerTask = new MSQControllerTask( + TEST_CONTROLLER_TASK_ID, + msqSpec, + null, + sqlContext, + null, + ImmutableList.of(SqlTypeName.TIMESTAMP, SqlTypeName.VARCHAR, SqlTypeName.BIGINT), + ImmutableList.of(ColumnType.LONG, ColumnType.STRING, ColumnType.LONG), + null + ); + + testIngestQuery().setTaskSpec(controllerTask) + .setExpectedDataSource("foo1") + .setQueryContext(context) + .setExpectedRowSignature(rowSignature) + .setExpectedSegments(expectedFooSegments()) + .setExpectedResultRows(expectedRows) + .setExpectedMSQSegmentReport( + new MSQSegmentReport( + NumberedShardSpec.class.getSimpleName(), + "Using NumberedShardSpec to generate segments since the query is inserting rows." + ) + ) + .setExpectedSegmentGenerationProgressCountersForStageWorker( + CounterSnapshotMatcher + .with().segmentRowsProcessed(Arrays.stream(expectedArray).sum()), + 2, 0 + ) + .setExpectedLookupLoadingSpec(LookupLoadingSpec.ALL) + .verifyResults(); + + } + @MethodSource("data") @ParameterizedTest(name = "{index}:with context {0}") public void testInsertWithExistingTimeColumn(String contextName, Map context) throws IOException diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java index 305cdebf691..45ea0b2357c 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java @@ -2324,7 +2324,7 @@ public class MSQSelectTest extends MSQTestBase if (DURABLE_STORAGE.equals(contextName) || FAULT_TOLERANCE.equals(contextName)) { new File( localFileStorageDir, - DurableStorageUtils.getWorkerOutputSuccessFilePath("query-test-query", 0, 0) + DurableStorageUtils.getWorkerOutputSuccessFilePath(TEST_CONTROLLER_TASK_ID, 0, 0) ); Mockito.verify(localFileStorageConnector, Mockito.times(2)) diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java index bb8691b9322..d3ffeda743b 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java @@ -336,6 +336,7 @@ public class MSQTestBase extends BaseCalciteQueryTest private TestGroupByBuffers groupByBuffers; protected final WorkerMemoryParameters workerMemoryParameters = Mockito.spy(makeTestWorkerMemoryParameters()); + protected static final String TEST_CONTROLLER_TASK_ID = "query-test-query"; protected static class MSQBaseComponentSupplier extends StandardComponentSupplier { @@ -861,6 +862,7 @@ public class MSQTestBase extends BaseCalciteQueryTest public abstract class MSQTester> { protected String sql = null; + protected MSQControllerTask taskSpec = null; protected Map queryContext = DEFAULT_MSQ_CONTEXT; protected List expectedRowSignature = null; protected MSQSpec expectedMSQSpec = null; @@ -888,6 +890,12 @@ public class MSQTestBase extends BaseCalciteQueryTest return asBuilder(); } + public Builder setTaskSpec(MSQControllerTask taskSpec) + { + this.taskSpec = taskSpec; + return asBuilder(); + } + public Builder setQueryContext(Map queryContext) { this.queryContext = queryContext; @@ -1152,8 +1160,15 @@ public class MSQTestBase extends BaseCalciteQueryTest public void verifyResults() { - Preconditions.checkArgument(sql != null, "sql cannot be null"); - Preconditions.checkArgument(queryContext != null, "queryContext cannot be null"); + Preconditions.checkArgument( + sql != null || taskSpec != null, + "sql and taskSpec both cannot be null" + ); + Preconditions.checkArgument( + sql == null || taskSpec == null, + "sql and taskSpec both cannot be provided in the same test" + ); + Preconditions.checkArgument(sql == null || queryContext != null, "queryContext cannot be null"); Preconditions.checkArgument( (expectedResultRows != null && expectedResultRows.isEmpty()) || expectedDataSource != null, "dataSource cannot be null when expectedResultRows is non-empty" @@ -1169,7 +1184,15 @@ public class MSQTestBase extends BaseCalciteQueryTest Preconditions.checkArgument(expectedShardSpec != null, "shardSpecClass cannot be null"); readyToRun(); try { - String controllerId = runMultiStageQuery(sql, queryContext); + String controllerId; + if (sql != null) { + // Run the sql command. + controllerId = runMultiStageQuery(sql, queryContext); + } else { + // Run the task spec directly instead. + controllerId = TEST_CONTROLLER_TASK_ID; + indexingServiceClient.runTask(controllerId, taskSpec); + } if (expectedMSQFault != null || expectedMSQFaultClass != null) { MSQErrorReport msqErrorReport = getErrorReportOrThrow(controllerId); if (expectedMSQFault != null) { @@ -1362,18 +1385,32 @@ public class MSQTestBase extends BaseCalciteQueryTest assertResultsEquals(sql, expectedResultRows, transformedOutputRows); } catch (Exception e) { - throw new ISE(e, "Query %s failed", sql); + throw new ISE(e, "Query %s failed", sql != null ? sql : taskSpec); } } public void verifyExecutionError() { - Preconditions.checkArgument(sql != null, "sql cannot be null"); - Preconditions.checkArgument(queryContext != null, "queryContext cannot be null"); + Preconditions.checkArgument( + sql != null || taskSpec != null, + "sql and taskSpec both cannot be null" + ); + Preconditions.checkArgument( + sql == null || taskSpec == null, + "sql and taskSpec both cannot be provided in the same test" + ); + Preconditions.checkArgument(sql == null || queryContext != null, "queryContext cannot be null"); Preconditions.checkArgument(expectedExecutionErrorMatcher != null, "Execution error matcher cannot be null"); readyToRun(); try { - String controllerId = runMultiStageQuery(sql, queryContext); + String controllerId; + if (sql != null) { + controllerId = runMultiStageQuery(sql, queryContext); + } else { + // Run the task spec directly instead. + controllerId = TEST_CONTROLLER_TASK_ID; + indexingServiceClient.runTask(controllerId, taskSpec); + } getPayloadOrThrow(controllerId); Assert.fail(StringUtils.format("Query did not throw an exception (sql = [%s])", sql)); } @@ -1399,8 +1436,15 @@ public class MSQTestBase extends BaseCalciteQueryTest public Pair, List>> runQueryWithResult() { readyToRun(); - Preconditions.checkArgument(sql != null, "sql cannot be null"); - Preconditions.checkArgument(queryContext != null, "queryContext cannot be null"); + Preconditions.checkArgument( + sql != null || taskSpec != null, + "sql and taskSpec both cannot be null" + ); + Preconditions.checkArgument( + sql == null || taskSpec == null, + "sql and taskSpec both cannot be provided in the same test" + ); + Preconditions.checkArgument(sql == null || queryContext != null, "queryContext cannot be null"); try { String controllerId = runMultiStageQuery(sql, queryContext); @@ -1496,7 +1540,7 @@ public class MSQTestBase extends BaseCalciteQueryTest } catch (Exception e) { if (expectedExecutionErrorMatcher == null) { - throw new ISE(e, "Query %s failed", sql); + throw new ISE(e, "Query %s failed", sql != null ? sql : taskSpec); } assertThat(e, expectedExecutionErrorMatcher); return null; @@ -1517,7 +1561,7 @@ public class MSQTestBase extends BaseCalciteQueryTest } Assert.assertEquals(expectedRowSignature, specAndResults.rhs.lhs); - assertResultsEquals(sql, expectedResultRows, specAndResults.rhs.rhs); + assertResultsEquals(sql != null ? sql : taskSpec.toString(), expectedResultRows, specAndResults.rhs.rhs); assertMSQSpec(expectedMSQSpec, specAndResults.lhs); } @@ -1525,7 +1569,7 @@ public class MSQTestBase extends BaseCalciteQueryTest { Preconditions.checkArgument(expectedExecutionErrorMatcher != null, "Execution error matcher cannot be null"); if (runQueryWithResult() != null) { - throw new ISE("Query %s did not throw an exception", sql); + throw new ISE("Query %s did not throw an exception", sql != null ? sql : taskSpec); } } }