diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/GetDatafeedsStatsAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/GetDatafeedsStatsAction.java index 4b593d80462..6f526c7beb1 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/GetDatafeedsStatsAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/GetDatafeedsStatsAction.java @@ -43,9 +43,12 @@ import org.elasticsearch.xpack.persistent.PersistentTasks.PersistentTask; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.stream.Collectors; public class GetDatafeedsStatsAction extends Action { @@ -299,34 +302,44 @@ public class GetDatafeedsStatsAction extends Action listener) throws Exception { + protected void masterOperation(Request request, ClusterState state, + ActionListener listener) throws Exception { logger.debug("Get stats for datafeed '{}'", request.getDatafeedId()); - Map results = new HashMap<>(); MlMetadata mlMetadata = state.metaData().custom(MlMetadata.TYPE); - PersistentTasks tasksInProgress = state.getMetaData().custom(PersistentTasks.TYPE); - if (request.getDatafeedId().equals(ALL) == false && mlMetadata.getDatafeed(request.getDatafeedId()) == null) { + + if (request.getDatafeedId().equals(ALL) == false + && mlMetadata.getDatafeed(request.getDatafeedId()) == null) { throw ExceptionsHelper.missingDatafeedException(request.getDatafeedId()); } - for (DatafeedConfig datafeedConfig : mlMetadata.getDatafeeds().values()) { - if (request.getDatafeedId().equals(ALL) || datafeedConfig.getId().equals(request.getDatafeedId())) { - PersistentTask task = MlMetadata.getDatafeedTask(request.getDatafeedId(), tasksInProgress); - DatafeedState datafeedState = MlMetadata.getDatafeedState(request.getDatafeedId(), tasksInProgress); - DiscoveryNode node = null; - String explanation = null; - if (task != null) { - node = state.nodes().get(task.getExecutorNode()); - explanation = task.getAssignment().getExplanation(); - } - results.put(datafeedConfig.getId(), new DatafeedStats(datafeedConfig.getId(), datafeedState, node, explanation)); - } - } - QueryPage statsPage = new QueryPage<>(new ArrayList<>(results.values()), results.size(), + List expandedDatafeedsIds = request.getDatafeedId().equals(ALL) ? + mlMetadata.getDatafeeds().values().stream() + .map(d -> d.getId()).collect(Collectors.toList()) + : Collections.singletonList(request.getDatafeedId()); + + PersistentTasks tasksInProgress = state.getMetaData().custom(PersistentTasks.TYPE); + List results = expandedDatafeedsIds.stream() + .map(datafeedId -> getDatafeedStats(datafeedId, state, tasksInProgress)) + .collect(Collectors.toList()); + QueryPage statsPage = new QueryPage<>(results, results.size(), DatafeedConfig.RESULTS_FIELD); listener.onResponse(new Response(statsPage)); } + private static DatafeedStats getDatafeedStats(String datafeedId, ClusterState state, + PersistentTasks tasks) { + PersistentTask task = MlMetadata.getDatafeedTask(datafeedId, tasks); + DatafeedState datafeedState = MlMetadata.getDatafeedState(datafeedId, tasks); + DiscoveryNode node = null; + String explanation = null; + if (task != null) { + node = state.nodes().get(task.getExecutorNode()); + explanation = task.getAssignment().getExplanation(); + } + return new DatafeedStats(datafeedId, datafeedState, node, explanation); + } + @Override protected ClusterBlockException checkBlock(Request request, ClusterState state) { return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ); diff --git a/plugin/src/test/resources/rest-api-spec/test/ml/get_datafeed_stats.yaml b/plugin/src/test/resources/rest-api-spec/test/ml/get_datafeed_stats.yaml index 310654e2b98..945c5dcbe74 100644 --- a/plugin/src/test/resources/rest-api-spec/test/ml/get_datafeed_stats.yaml +++ b/plugin/src/test/resources/rest-api-spec/test/ml/get_datafeed_stats.yaml @@ -98,7 +98,36 @@ setup: - match: { datafeeds.0.node.attributes.max_running_jobs: "10"} --- -"Test explicit get all datafeed stats": +"Test implicit get all datafeed stats given started datafeeds": + + - do: + xpack.ml.open_job: + job_id: job-1 + + - do: + xpack.ml.start_datafeed: + "datafeed_id": "datafeed-1" + "start": 0 + + - do: + xpack.ml.open_job: + job_id: job-2 + + - do: + xpack.ml.start_datafeed: + "datafeed_id": "datafeed-2" + "start": 0 + + - do: + xpack.ml.get_datafeed_stats: {} + - match: { count: 2 } + - match: { datafeeds.0.datafeed_id: "datafeed-1"} + - match: { datafeeds.0.state: "started"} + - match: { datafeeds.1.datafeed_id: "datafeed-2"} + - match: { datafeeds.1.state: "started"} + +--- +"Test explicit get all datafeed stats given stopped datafeeds": - do: xpack.ml.get_datafeed_stats: @@ -110,7 +139,7 @@ setup: - match: { datafeeds.1.state: "stopped"} --- -"Test implicit get all datafeed stats": +"Test implicit get all datafeed stats given stopped datafeeds": - do: xpack.ml.get_datafeed_stats: {}