AutodetectProcessManager#getStatistics(...) should can now just return stats for single job as the _all expension is done on the transport layer

Original commit: elastic/x-pack-elasticsearch@02d5272a4e
This commit is contained in:
Martijn van Groningen 2017-02-02 12:57:09 +01:00
parent 5ba9a6cfcc
commit 06d688eb74
2 changed files with 17 additions and 14 deletions

View File

@ -20,6 +20,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
@ -47,6 +48,7 @@ import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;
import java.util.function.Consumer; import java.util.function.Consumer;
@ -344,11 +346,14 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Request, GetJo
ActionListener<QueryPage<Response.JobStats>> listener) { ActionListener<QueryPage<Response.JobStats>> listener) {
logger.debug("Get stats for job '{}'", request.getJobId()); logger.debug("Get stats for job '{}'", request.getJobId());
MlMetadata mlMetadata = clusterService.state().metaData().custom(MlMetadata.TYPE); MlMetadata mlMetadata = clusterService.state().metaData().custom(MlMetadata.TYPE);
List<Response.JobStats> stats = processManager.getStatistics(request.getJobId()).map(t -> { Optional<Tuple<DataCounts, ModelSizeStats>> stats = processManager.getStatistics(request.getJobId());
String jobId = t.v1().getJobid(); if (stats.isPresent()) {
return new Response.JobStats(jobId, t.v1(), t.v2(), mlMetadata.getAllocations().get(jobId).getStatus()); JobStatus jobStatus = mlMetadata.getAllocations().get(request.jobId).getStatus();
}).collect(Collectors.toList()); Response.JobStats jobStats = new Response.JobStats(request.jobId, stats.get().v1(), stats.get().v2(), jobStatus);
listener.onResponse(new QueryPage<>(stats, stats.size(), Job.RESULTS_FIELD)); listener.onResponse(new QueryPage<>(Collections.singletonList(jobStats), 1, Job.RESULTS_FIELD));
} else {
listener.onResponse(new QueryPage<>(Collections.emptyList(), 0, Job.RESULTS_FIELD));
}
} }
// Up until now we gathered the stats for jobs that were open, // Up until now we gathered the stats for jobs that were open,

View File

@ -47,7 +47,7 @@ import java.io.InputStream;
import java.time.Duration; import java.time.Duration;
import java.time.ZonedDateTime; import java.time.ZonedDateTime;
import java.util.Locale; import java.util.Locale;
import java.util.Map; import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
@ -56,7 +56,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.stream.Stream;
public class AutodetectProcessManager extends AbstractComponent { public class AutodetectProcessManager extends AbstractComponent {
@ -338,12 +337,11 @@ public class AutodetectProcessManager extends AbstractComponent {
client.execute(UpdateJobStatusAction.INSTANCE, request, ActionListener.wrap(r -> handler.accept(null), errorHandler)); client.execute(UpdateJobStatusAction.INSTANCE, request, ActionListener.wrap(r -> handler.accept(null), errorHandler));
} }
public Stream<Tuple<DataCounts, ModelSizeStats>> getStatistics(String jobId) { public Optional<Tuple<DataCounts, ModelSizeStats>> getStatistics(String jobId) {
return autoDetectCommunicatorByJob.entrySet().stream() AutodetectCommunicator communicator = autoDetectCommunicatorByJob.get(jobId);
.filter(entry -> jobId.equals(entry.getKey())) if (communicator == null) {
.map(Map.Entry::getValue) return Optional.empty();
.map(autodetectCommunicator -> { }
return new Tuple<>(autodetectCommunicator.getDataCounts(), autodetectCommunicator.getModelSizeStats()); return Optional.of(new Tuple<>(communicator.getDataCounts(), communicator.getModelSizeStats()));
});
} }
} }