[ML] Correctly collect datafeed stats for _all (elastic/x-pack-elasticsearch#776)

Datafeed state was always stopped when the stats
endpoint was called with _all. The reason is that
_all was not being expanded into all datafeed IDs.

This commit fixes the issue by expanding _all into
all datafeed IDs in the cluster. Stats are then fetched
for all of them.

relates elastic/x-pack-elasticsearch#693

Original commit: elastic/x-pack-elasticsearch@ccbdb35b6e
This commit is contained in:
Dimitris Athanasiou 2017-03-20 17:51:36 +00:00 committed by GitHub
parent f96a40c61a
commit 025461c3c8
2 changed files with 62 additions and 20 deletions

View File

@ -43,9 +43,12 @@ import org.elasticsearch.xpack.persistent.PersistentTasks.PersistentTask;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.stream.Collectors;
public class GetDatafeedsStatsAction extends Action<GetDatafeedsStatsAction.Request, GetDatafeedsStatsAction.Response, public class GetDatafeedsStatsAction extends Action<GetDatafeedsStatsAction.Request, GetDatafeedsStatsAction.Response,
GetDatafeedsStatsAction.RequestBuilder> { GetDatafeedsStatsAction.RequestBuilder> {
@ -299,34 +302,44 @@ public class GetDatafeedsStatsAction extends Action<GetDatafeedsStatsAction.Requ
} }
@Override @Override
protected void masterOperation(Request request, ClusterState state, ActionListener<Response> listener) throws Exception { protected void masterOperation(Request request, ClusterState state,
ActionListener<Response> listener) throws Exception {
logger.debug("Get stats for datafeed '{}'", request.getDatafeedId()); logger.debug("Get stats for datafeed '{}'", request.getDatafeedId());
Map<String, DatafeedStats> results = new HashMap<>();
MlMetadata mlMetadata = state.metaData().custom(MlMetadata.TYPE); MlMetadata mlMetadata = state.metaData().custom(MlMetadata.TYPE);
PersistentTasks tasksInProgress = state.getMetaData().custom(PersistentTasks.TYPE);
if (request.getDatafeedId().equals(ALL) == false && mlMetadata.getDatafeed(request.getDatafeedId()) == null) { if (request.getDatafeedId().equals(ALL) == false
&& mlMetadata.getDatafeed(request.getDatafeedId()) == null) {
throw ExceptionsHelper.missingDatafeedException(request.getDatafeedId()); throw ExceptionsHelper.missingDatafeedException(request.getDatafeedId());
} }
for (DatafeedConfig datafeedConfig : mlMetadata.getDatafeeds().values()) { List<String> expandedDatafeedsIds = request.getDatafeedId().equals(ALL) ?
if (request.getDatafeedId().equals(ALL) || datafeedConfig.getId().equals(request.getDatafeedId())) { mlMetadata.getDatafeeds().values().stream()
PersistentTask<?> task = MlMetadata.getDatafeedTask(request.getDatafeedId(), tasksInProgress); .map(d -> d.getId()).collect(Collectors.toList())
DatafeedState datafeedState = MlMetadata.getDatafeedState(request.getDatafeedId(), tasksInProgress); : Collections.singletonList(request.getDatafeedId());
DiscoveryNode node = null;
String explanation = null; PersistentTasks tasksInProgress = state.getMetaData().custom(PersistentTasks.TYPE);
if (task != null) { List<DatafeedStats> results = expandedDatafeedsIds.stream()
node = state.nodes().get(task.getExecutorNode()); .map(datafeedId -> getDatafeedStats(datafeedId, state, tasksInProgress))
explanation = task.getAssignment().getExplanation(); .collect(Collectors.toList());
} QueryPage<DatafeedStats> statsPage = new QueryPage<>(results, results.size(),
results.put(datafeedConfig.getId(), new DatafeedStats(datafeedConfig.getId(), datafeedState, node, explanation));
}
}
QueryPage<DatafeedStats> statsPage = new QueryPage<>(new ArrayList<>(results.values()), results.size(),
DatafeedConfig.RESULTS_FIELD); DatafeedConfig.RESULTS_FIELD);
listener.onResponse(new Response(statsPage)); listener.onResponse(new Response(statsPage));
} }
private static DatafeedStats getDatafeedStats(String datafeedId, ClusterState state,
PersistentTasks tasks) {
PersistentTask<?> task = MlMetadata.getDatafeedTask(datafeedId, tasks);
DatafeedState datafeedState = MlMetadata.getDatafeedState(datafeedId, tasks);
DiscoveryNode node = null;
String explanation = null;
if (task != null) {
node = state.nodes().get(task.getExecutorNode());
explanation = task.getAssignment().getExplanation();
}
return new DatafeedStats(datafeedId, datafeedState, node, explanation);
}
@Override @Override
protected ClusterBlockException checkBlock(Request request, ClusterState state) { protected ClusterBlockException checkBlock(Request request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ); return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);

View File

@ -98,7 +98,36 @@ setup:
- match: { datafeeds.0.node.attributes.max_running_jobs: "10"} - match: { datafeeds.0.node.attributes.max_running_jobs: "10"}
--- ---
"Test explicit get all datafeed stats": "Test implicit get all datafeed stats given started datafeeds":
- do:
xpack.ml.open_job:
job_id: job-1
- do:
xpack.ml.start_datafeed:
"datafeed_id": "datafeed-1"
"start": 0
- do:
xpack.ml.open_job:
job_id: job-2
- do:
xpack.ml.start_datafeed:
"datafeed_id": "datafeed-2"
"start": 0
- do:
xpack.ml.get_datafeed_stats: {}
- match: { count: 2 }
- match: { datafeeds.0.datafeed_id: "datafeed-1"}
- match: { datafeeds.0.state: "started"}
- match: { datafeeds.1.datafeed_id: "datafeed-2"}
- match: { datafeeds.1.state: "started"}
---
"Test explicit get all datafeed stats given stopped datafeeds":
- do: - do:
xpack.ml.get_datafeed_stats: xpack.ml.get_datafeed_stats:
@ -110,7 +139,7 @@ setup:
- match: { datafeeds.1.state: "stopped"} - match: { datafeeds.1.state: "stopped"}
--- ---
"Test implicit get all datafeed stats": "Test implicit get all datafeed stats given stopped datafeeds":
- do: - do:
xpack.ml.get_datafeed_stats: {} xpack.ml.get_datafeed_stats: {}