[ML] Correctly return job stats for _all (elastic/x-pack-elasticsearch#540)
Original commit: elastic/x-pack-elasticsearch@a286ea36e9
This commit is contained in:
parent
1b4fec642c
commit
3e43b591df
|
@ -50,6 +50,7 @@ import java.util.Collections;
|
|||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.Consumer;
|
||||
|
@ -168,7 +169,7 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Request, GetJo
|
|||
state = JobState.fromStream(in);
|
||||
}
|
||||
|
||||
public String getJobid() {
|
||||
public String getJobId() {
|
||||
return jobId;
|
||||
}
|
||||
|
||||
|
@ -345,12 +346,13 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Request, GetJo
|
|||
@Override
|
||||
protected void taskOperation(Request request, OpenJobAction.JobTask task,
|
||||
ActionListener<QueryPage<Response.JobStats>> listener) {
|
||||
logger.debug("Get stats for job '{}'", request.getJobId());
|
||||
String jobId = task.getJobId();
|
||||
logger.debug("Get stats for job '{}'", jobId);
|
||||
PersistentTasksInProgress tasks = clusterService.state().custom(PersistentTasksInProgress.TYPE);
|
||||
Optional<Tuple<DataCounts, ModelSizeStats>> stats = processManager.getStatistics(request.getJobId());
|
||||
Optional<Tuple<DataCounts, ModelSizeStats>> stats = processManager.getStatistics(jobId);
|
||||
if (stats.isPresent()) {
|
||||
JobState jobState = MlMetadata.getJobState(request.jobId, tasks);
|
||||
Response.JobStats jobStats = new Response.JobStats(request.jobId, stats.get().v1(), stats.get().v2(), jobState);
|
||||
JobState jobState = MlMetadata.getJobState(jobId, tasks);
|
||||
Response.JobStats jobStats = new Response.JobStats(jobId, stats.get().v1(), stats.get().v2(), jobState);
|
||||
listener.onResponse(new QueryPage<>(Collections.singletonList(jobStats), 1, Job.RESULTS_FIELD));
|
||||
} else {
|
||||
listener.onResponse(new QueryPage<>(Collections.emptyList(), 0, Job.RESULTS_FIELD));
|
||||
|
@ -397,18 +399,8 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Request, GetJo
|
|||
}
|
||||
|
||||
static List<String> determineJobIdsWithoutLiveStats(List<String> requestedJobIds, List<Response.JobStats> stats) {
|
||||
List<String> jobIds = new ArrayList<>();
|
||||
outer: for (String jobId : requestedJobIds) {
|
||||
for (Response.JobStats stat : stats) {
|
||||
if (stat.getJobid().equals(jobId)) {
|
||||
// we already have stats, no need to get stats for this job from an index
|
||||
continue outer;
|
||||
}
|
||||
}
|
||||
jobIds.add(jobId);
|
||||
}
|
||||
return jobIds;
|
||||
Set<String> excludeJobIds = stats.stream().map(s -> s.getJobId()).collect(Collectors.toSet());
|
||||
return requestedJobIds.stream().filter(jobId -> !excludeJobIds.contains(jobId)).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -199,10 +199,16 @@ public class OpenJobAction extends Action<OpenJobAction.Request, PersistentActio
|
|||
|
||||
public static class JobTask extends PersistentTask {
|
||||
|
||||
private final String jobId;
|
||||
private volatile Consumer<String> cancelHandler;
|
||||
|
||||
JobTask(String jobId, long id, String type, String action, TaskId parentTask) {
|
||||
super(id, type, action, "job-" + jobId, parentTask);
|
||||
this.jobId = jobId;
|
||||
}
|
||||
|
||||
public String getJobId() {
|
||||
return jobId;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -126,6 +126,8 @@ setup:
|
|||
xpack.ml.get_job_stats:
|
||||
job_id: _all
|
||||
- match: { count: 2 }
|
||||
- match: { jobs.0.state: opened }
|
||||
- match: { jobs.1.state: opened }
|
||||
|
||||
---
|
||||
"Test get all job stats implicitly":
|
||||
|
|
Loading…
Reference in New Issue