mirror of https://github.com/apache/druid.git
integration-tests: make ITParallelIndexTest still work in parallel (#7211)
* integration-tests: make ITParallelIndexTest still work in parallel Follow-up to #7181, which made the default behavior for index_parallel tasks non-parallel. * Validate that parallel index subtasks were run
This commit is contained in:
parent
3ed250787d
commit
de55905a5f
|
@ -153,6 +153,11 @@ public class OverlordResourceTestClient
|
|||
return getTasks("pendingTasks");
|
||||
}
|
||||
|
||||
public List<TaskResponseObject> getCompleteTasksForDataSource(final String dataSource)
|
||||
{
|
||||
return getTasks(StringUtils.format("tasks?state=complete&datasource=%s", StringUtils.urlEncode(dataSource)));
|
||||
}
|
||||
|
||||
private List<TaskResponseObject> getTasks(String identifier)
|
||||
{
|
||||
try {
|
||||
|
@ -233,7 +238,14 @@ public class OverlordResourceTestClient
|
|||
{
|
||||
try {
|
||||
StatusResponseHolder response = httpClient.go(
|
||||
new Request(HttpMethod.POST, new URL(StringUtils.format("%ssupervisor/%s/shutdown", getIndexerURL(), StringUtils.urlEncode(id)))),
|
||||
new Request(
|
||||
HttpMethod.POST,
|
||||
new URL(StringUtils.format(
|
||||
"%ssupervisor/%s/shutdown",
|
||||
getIndexerURL(),
|
||||
StringUtils.urlEncode(id)
|
||||
))
|
||||
),
|
||||
responseHandler
|
||||
).get();
|
||||
if (!response.getStatus().equals(HttpResponseStatus.OK)) {
|
||||
|
|
|
@ -28,6 +28,7 @@ public class TaskResponseObject
|
|||
{
|
||||
|
||||
private final String id;
|
||||
private final String type;
|
||||
private final DateTime createdTime;
|
||||
private final DateTime queueInsertionTime;
|
||||
private final TaskState status;
|
||||
|
@ -35,12 +36,14 @@ public class TaskResponseObject
|
|||
@JsonCreator
|
||||
private TaskResponseObject(
|
||||
@JsonProperty("id") String id,
|
||||
@JsonProperty("type") String type,
|
||||
@JsonProperty("createdTime") DateTime createdTime,
|
||||
@JsonProperty("queueInsertionTime") DateTime queueInsertionTime,
|
||||
@JsonProperty("status") TaskState status
|
||||
)
|
||||
{
|
||||
this.id = id;
|
||||
this.type = type;
|
||||
this.createdTime = createdTime;
|
||||
this.queueInsertionTime = queueInsertionTime;
|
||||
this.status = status;
|
||||
|
@ -52,6 +55,12 @@ public class TaskResponseObject
|
|||
return id;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unused") // Used by Jackson serialization?
|
||||
public String getType()
|
||||
{
|
||||
return type;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unused") // Used by Jackson serialization?
|
||||
public DateTime getCreatedTime()
|
||||
{
|
||||
|
|
|
@ -160,10 +160,25 @@ public class AbstractITBatchIndexTest extends AbstractIndexerTest
|
|||
{
|
||||
final Set<String> oldVersions = waitForNewVersion ? coordinator.getSegmentVersions(dataSourceName) : null;
|
||||
|
||||
long startSubTaskCount = -1;
|
||||
final boolean assertRunsSubTasks = taskSpec.contains("index_parallel");
|
||||
if (assertRunsSubTasks) {
|
||||
startSubTaskCount = countCompleteSubTasks(dataSourceName);
|
||||
}
|
||||
|
||||
final String taskID = indexer.submitTask(taskSpec);
|
||||
LOG.info("TaskID for loading index task %s", taskID);
|
||||
indexer.waitUntilTaskCompletes(taskID);
|
||||
|
||||
if (assertRunsSubTasks) {
|
||||
final long newSubTasks = countCompleteSubTasks(dataSourceName) - startSubTaskCount;
|
||||
Assert.assertTrue(
|
||||
StringUtils.format(
|
||||
"The supervisor task[%s] didn't create any sub tasks. Was it executed in the parallel mode?",
|
||||
taskID
|
||||
), newSubTasks > 0);
|
||||
}
|
||||
|
||||
// ITParallelIndexTest does a second round of ingestion to replace segements in an existing
|
||||
// data source. For that second round we need to make sure the coordinator actually learned
|
||||
// about the new segments befor waiting for it to report that all segments are loaded; otherwise
|
||||
|
@ -179,4 +194,12 @@ public class AbstractITBatchIndexTest extends AbstractIndexerTest
|
|||
() -> coordinator.areSegmentsLoaded(dataSourceName), "Segment Load"
|
||||
);
|
||||
}
|
||||
|
||||
private long countCompleteSubTasks(final String dataSource)
|
||||
{
|
||||
return indexer.getCompleteTasksForDataSource(dataSource)
|
||||
.stream()
|
||||
.filter(t -> t.getType().equals("index_sub"))
|
||||
.count();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -61,6 +61,10 @@
|
|||
"baseDir": "/resources/data/batch_index",
|
||||
"filter": "wikipedia_index_data*"
|
||||
}
|
||||
},
|
||||
"tuningConfig": {
|
||||
"type": "index_parallel",
|
||||
"maxNumSubTasks": 10
|
||||
}
|
||||
}
|
||||
}
|
|
@ -60,6 +60,10 @@
|
|||
"baseDir": "/resources/data/batch_index",
|
||||
"filter": "wikipedia_index_data2*"
|
||||
}
|
||||
},
|
||||
"tuningConfig": {
|
||||
"type": "index_parallel",
|
||||
"maxNumSubTasks": 10
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue