diff --git a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java index 5cee2fb2886..f2e600e4264 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java @@ -153,6 +153,11 @@ public class OverlordResourceTestClient return getTasks("pendingTasks"); } + public List getCompleteTasksForDataSource(final String dataSource) + { + return getTasks(StringUtils.format("tasks?state=complete&datasource=%s", StringUtils.urlEncode(dataSource))); + } + private List getTasks(String identifier) { try { @@ -233,7 +238,14 @@ public class OverlordResourceTestClient { try { StatusResponseHolder response = httpClient.go( - new Request(HttpMethod.POST, new URL(StringUtils.format("%ssupervisor/%s/shutdown", getIndexerURL(), StringUtils.urlEncode(id)))), + new Request( + HttpMethod.POST, + new URL(StringUtils.format( + "%ssupervisor/%s/shutdown", + getIndexerURL(), + StringUtils.urlEncode(id) + )) + ), responseHandler ).get(); if (!response.getStatus().equals(HttpResponseStatus.OK)) { diff --git a/integration-tests/src/main/java/org/apache/druid/testing/clients/TaskResponseObject.java b/integration-tests/src/main/java/org/apache/druid/testing/clients/TaskResponseObject.java index 501cd5e1398..38f0a86223e 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/clients/TaskResponseObject.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/clients/TaskResponseObject.java @@ -28,6 +28,7 @@ public class TaskResponseObject { private final String id; + private final String type; private final DateTime createdTime; private final DateTime queueInsertionTime; private final TaskState status; @@ -35,12 +36,14 @@ public class TaskResponseObject @JsonCreator private TaskResponseObject( @JsonProperty("id") String id, + @JsonProperty("type") String type, @JsonProperty("createdTime") DateTime createdTime, @JsonProperty("queueInsertionTime") DateTime queueInsertionTime, @JsonProperty("status") TaskState status ) { this.id = id; + this.type = type; this.createdTime = createdTime; this.queueInsertionTime = queueInsertionTime; this.status = status; @@ -52,6 +55,12 @@ public class TaskResponseObject return id; } + @SuppressWarnings("unused") // Used by Jackson serialization? + public String getType() + { + return type; + } + @SuppressWarnings("unused") // Used by Jackson serialization? public DateTime getCreatedTime() { diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java index 226824f476f..121fd7a9bda 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java @@ -160,10 +160,25 @@ public class AbstractITBatchIndexTest extends AbstractIndexerTest { final Set oldVersions = waitForNewVersion ? coordinator.getSegmentVersions(dataSourceName) : null; + long startSubTaskCount = -1; + final boolean assertRunsSubTasks = taskSpec.contains("index_parallel"); + if (assertRunsSubTasks) { + startSubTaskCount = countCompleteSubTasks(dataSourceName); + } + final String taskID = indexer.submitTask(taskSpec); LOG.info("TaskID for loading index task %s", taskID); indexer.waitUntilTaskCompletes(taskID); + if (assertRunsSubTasks) { + final long newSubTasks = countCompleteSubTasks(dataSourceName) - startSubTaskCount; + Assert.assertTrue( + StringUtils.format( + "The supervisor task[%s] didn't create any sub tasks. Was it executed in the parallel mode?", + taskID + ), newSubTasks > 0); + } + // ITParallelIndexTest does a second round of ingestion to replace segements in an existing // data source. For that second round we need to make sure the coordinator actually learned // about the new segments befor waiting for it to report that all segments are loaded; otherwise @@ -179,4 +194,12 @@ public class AbstractITBatchIndexTest extends AbstractIndexerTest () -> coordinator.areSegmentsLoaded(dataSourceName), "Segment Load" ); } + + private long countCompleteSubTasks(final String dataSource) + { + return indexer.getCompleteTasksForDataSource(dataSource) + .stream() + .filter(t -> t.getType().equals("index_sub")) + .count(); + } } diff --git a/integration-tests/src/test/resources/indexer/wikipedia_parallel_index_task.json b/integration-tests/src/test/resources/indexer/wikipedia_parallel_index_task.json index f317c538f6b..887508ad7e9 100644 --- a/integration-tests/src/test/resources/indexer/wikipedia_parallel_index_task.json +++ b/integration-tests/src/test/resources/indexer/wikipedia_parallel_index_task.json @@ -61,6 +61,10 @@ "baseDir": "/resources/data/batch_index", "filter": "wikipedia_index_data*" } + }, + "tuningConfig": { + "type": "index_parallel", + "maxNumSubTasks": 10 } } } \ No newline at end of file diff --git a/integration-tests/src/test/resources/indexer/wikipedia_parallel_reindex_task.json b/integration-tests/src/test/resources/indexer/wikipedia_parallel_reindex_task.json index c06890bfde4..ef16c648cb8 100644 --- a/integration-tests/src/test/resources/indexer/wikipedia_parallel_reindex_task.json +++ b/integration-tests/src/test/resources/indexer/wikipedia_parallel_reindex_task.json @@ -60,6 +60,10 @@ "baseDir": "/resources/data/batch_index", "filter": "wikipedia_index_data2*" } + }, + "tuningConfig": { + "type": "index_parallel", + "maxNumSubTasks": 10 } } } \ No newline at end of file