From 06d688eb74657f767f521f0ee7ce3ee000ba03ae Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Thu, 2 Feb 2017 12:57:09 +0100 Subject: [PATCH] AutodetectProcessManager#getStatistics(...) should can now just return stats for single job as the _all expension is done on the transport layer Original commit: elastic/x-pack-elasticsearch@02d5272a4ef2f17628cdc57ec2320feb1932b42e --- .../xpack/ml/action/GetJobsStatsAction.java | 15 ++++++++++----- .../autodetect/AutodetectProcessManager.java | 16 +++++++--------- 2 files changed, 17 insertions(+), 14 deletions(-) diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/action/GetJobsStatsAction.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/action/GetJobsStatsAction.java index 9e1ba78a37b..4fe23cfde9d 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/action/GetJobsStatsAction.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/action/GetJobsStatsAction.java @@ -20,6 +20,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -47,6 +48,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -344,11 +346,14 @@ public class GetJobsStatsAction extends Action> listener) { logger.debug("Get stats for job '{}'", request.getJobId()); MlMetadata mlMetadata = clusterService.state().metaData().custom(MlMetadata.TYPE); - List stats = processManager.getStatistics(request.getJobId()).map(t -> { - String jobId = t.v1().getJobid(); - return new Response.JobStats(jobId, t.v1(), t.v2(), mlMetadata.getAllocations().get(jobId).getStatus()); - }).collect(Collectors.toList()); - listener.onResponse(new QueryPage<>(stats, stats.size(), Job.RESULTS_FIELD)); + Optional> stats = processManager.getStatistics(request.getJobId()); + if (stats.isPresent()) { + JobStatus jobStatus = mlMetadata.getAllocations().get(request.jobId).getStatus(); + Response.JobStats jobStats = new Response.JobStats(request.jobId, stats.get().v1(), stats.get().v2(), jobStatus); + listener.onResponse(new QueryPage<>(Collections.singletonList(jobStats), 1, Job.RESULTS_FIELD)); + } else { + listener.onResponse(new QueryPage<>(Collections.emptyList(), 0, Job.RESULTS_FIELD)); + } } // Up until now we gathered the stats for jobs that were open, diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index e184467b6d5..c2336f6c1de 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -47,7 +47,7 @@ import java.io.InputStream; import java.time.Duration; import java.time.ZonedDateTime; import java.util.Locale; -import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -56,7 +56,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; -import java.util.stream.Stream; public class AutodetectProcessManager extends AbstractComponent { @@ -338,12 +337,11 @@ public class AutodetectProcessManager extends AbstractComponent { client.execute(UpdateJobStatusAction.INSTANCE, request, ActionListener.wrap(r -> handler.accept(null), errorHandler)); } - public Stream> getStatistics(String jobId) { - return autoDetectCommunicatorByJob.entrySet().stream() - .filter(entry -> jobId.equals(entry.getKey())) - .map(Map.Entry::getValue) - .map(autodetectCommunicator -> { - return new Tuple<>(autodetectCommunicator.getDataCounts(), autodetectCommunicator.getModelSizeStats()); - }); + public Optional> getStatistics(String jobId) { + AutodetectCommunicator communicator = autoDetectCommunicatorByJob.get(jobId); + if (communicator == null) { + return Optional.empty(); + } + return Optional.of(new Tuple<>(communicator.getDataCounts(), communicator.getModelSizeStats())); } }