Use dedicated ML APIs in tests (#30941)
ML has dedicated APIs for datafeeds and jobs yet base test classes and some tests were relying on the cluster state for this state. This commit removes this usage in favor of using the dedicated endpoints.
This commit is contained in:
parent
83a7ade7c5
commit
bcfdccaf3f
|
@ -6,6 +6,8 @@
|
|||
package org.elasticsearch.xpack.core.ml.integration;
|
||||
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.client.Request;
|
||||
import org.elasticsearch.client.Response;
|
||||
import org.elasticsearch.client.RestClient;
|
||||
import org.elasticsearch.common.xcontent.support.XContentMapValues;
|
||||
import org.elasticsearch.test.rest.ESRestTestCase;
|
||||
|
@ -35,10 +37,12 @@ public class MlRestTestStateCleaner {
|
|||
|
||||
@SuppressWarnings("unchecked")
|
||||
private void deleteAllDatafeeds() throws IOException {
|
||||
Map<String, Object> clusterStateAsMap = testCase.entityAsMap(adminClient.performRequest("GET", "/_cluster/state",
|
||||
Collections.singletonMap("filter_path", "metadata.ml.datafeeds")));
|
||||
List<Map<String, Object>> datafeeds =
|
||||
(List<Map<String, Object>>) XContentMapValues.extractValue("metadata.ml.datafeeds", clusterStateAsMap);
|
||||
final Request datafeedsRequest = new Request("GET", "/_xpack/ml/datafeeds");
|
||||
datafeedsRequest.addParameter("filter_path", "datafeeds");
|
||||
final Response datafeedsResponse = adminClient.performRequest(datafeedsRequest);
|
||||
@SuppressWarnings("unchecked")
|
||||
final List<Map<String, Object>> datafeeds =
|
||||
(List<Map<String, Object>>) XContentMapValues.extractValue("datafeeds", testCase.entityAsMap(datafeedsResponse));
|
||||
if (datafeeds == null) {
|
||||
return;
|
||||
}
|
||||
|
@ -75,11 +79,12 @@ public class MlRestTestStateCleaner {
|
|||
}
|
||||
|
||||
private void deleteAllJobs() throws IOException {
|
||||
Map<String, Object> clusterStateAsMap = testCase.entityAsMap(adminClient.performRequest("GET", "/_cluster/state",
|
||||
Collections.singletonMap("filter_path", "metadata.ml.jobs")));
|
||||
final Request jobsRequest = new Request("GET", "/_xpack/ml/anomaly_detectors");
|
||||
jobsRequest.addParameter("filter_path", "jobs");
|
||||
final Response response = adminClient.performRequest(jobsRequest);
|
||||
@SuppressWarnings("unchecked")
|
||||
List<Map<String, Object>> jobConfigs =
|
||||
(List<Map<String, Object>>) XContentMapValues.extractValue("metadata.ml.jobs", clusterStateAsMap);
|
||||
final List<Map<String, Object>> jobConfigs =
|
||||
(List<Map<String, Object>>) XContentMapValues.extractValue("jobs", testCase.entityAsMap(response));
|
||||
if (jobConfigs == null) {
|
||||
return;
|
||||
}
|
||||
|
|
|
@ -27,6 +27,10 @@ import org.elasticsearch.test.ESIntegTestCase;
|
|||
import org.elasticsearch.test.MockHttpTransport;
|
||||
import org.elasticsearch.test.discovery.TestZenDiscovery;
|
||||
import org.elasticsearch.xpack.core.XPackSettings;
|
||||
import org.elasticsearch.xpack.core.ml.action.GetDatafeedsAction;
|
||||
import org.elasticsearch.xpack.core.ml.action.GetJobsAction;
|
||||
import org.elasticsearch.xpack.core.ml.action.util.QueryPage;
|
||||
import org.elasticsearch.xpack.core.ml.client.MachineLearningClient;
|
||||
import org.elasticsearch.xpack.ml.LocalStateMachineLearning;
|
||||
import org.elasticsearch.xpack.ml.MachineLearning;
|
||||
import org.elasticsearch.xpack.core.ml.MachineLearningField;
|
||||
|
@ -271,7 +275,9 @@ public abstract class BaseMlIntegTestCase extends ESIntegTestCase {
|
|||
}
|
||||
|
||||
public static void deleteAllDatafeeds(Logger logger, Client client) throws Exception {
|
||||
MlMetadata mlMetadata = MlMetadata.getMlMetadata(client.admin().cluster().prepareState().get().getState());
|
||||
final MachineLearningClient mlClient = new MachineLearningClient(client);
|
||||
final QueryPage<DatafeedConfig> datafeeds =
|
||||
mlClient.getDatafeeds(new GetDatafeedsAction.Request(GetDatafeedsAction.ALL)).actionGet().getResponse();
|
||||
try {
|
||||
logger.info("Closing all datafeeds (using _all)");
|
||||
StopDatafeedAction.Response stopResponse = client
|
||||
|
@ -292,11 +298,10 @@ public abstract class BaseMlIntegTestCase extends ESIntegTestCase {
|
|||
"Had to resort to force-stopping datafeed, something went wrong?", e1);
|
||||
}
|
||||
|
||||
for (DatafeedConfig datafeed : mlMetadata.getDatafeeds().values()) {
|
||||
String datafeedId = datafeed.getId();
|
||||
for (final DatafeedConfig datafeed : datafeeds.results()) {
|
||||
assertBusy(() -> {
|
||||
try {
|
||||
GetDatafeedsStatsAction.Request request = new GetDatafeedsStatsAction.Request(datafeedId);
|
||||
GetDatafeedsStatsAction.Request request = new GetDatafeedsStatsAction.Request(datafeed.getId());
|
||||
GetDatafeedsStatsAction.Response r = client.execute(GetDatafeedsStatsAction.INSTANCE, request).get();
|
||||
assertThat(r.getResponse().results().get(0).getDatafeedState(), equalTo(DatafeedState.STOPPED));
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
|
@ -304,13 +309,14 @@ public abstract class BaseMlIntegTestCase extends ESIntegTestCase {
|
|||
}
|
||||
});
|
||||
DeleteDatafeedAction.Response deleteResponse =
|
||||
client.execute(DeleteDatafeedAction.INSTANCE, new DeleteDatafeedAction.Request(datafeedId)).get();
|
||||
client.execute(DeleteDatafeedAction.INSTANCE, new DeleteDatafeedAction.Request(datafeed.getId())).get();
|
||||
assertTrue(deleteResponse.isAcknowledged());
|
||||
}
|
||||
}
|
||||
|
||||
public static void deleteAllJobs(Logger logger, Client client) throws Exception {
|
||||
MlMetadata mlMetadata = MlMetadata.getMlMetadata(client.admin().cluster().prepareState().get().getState());
|
||||
final MachineLearningClient mlClient = new MachineLearningClient(client);
|
||||
final QueryPage<Job> jobs = mlClient.getJobs(new GetJobsAction.Request(MetaData.ALL)).actionGet().getResponse();
|
||||
|
||||
try {
|
||||
CloseJobAction.Request closeRequest = new CloseJobAction.Request(MetaData.ALL);
|
||||
|
@ -334,15 +340,14 @@ public abstract class BaseMlIntegTestCase extends ESIntegTestCase {
|
|||
e1);
|
||||
}
|
||||
|
||||
for (Map.Entry<String, Job> entry : mlMetadata.getJobs().entrySet()) {
|
||||
String jobId = entry.getKey();
|
||||
for (final Job job : jobs.results()) {
|
||||
assertBusy(() -> {
|
||||
GetJobsStatsAction.Response statsResponse =
|
||||
client().execute(GetJobsStatsAction.INSTANCE, new GetJobsStatsAction.Request(jobId)).actionGet();
|
||||
client().execute(GetJobsStatsAction.INSTANCE, new GetJobsStatsAction.Request(job.getId())).actionGet();
|
||||
assertEquals(JobState.CLOSED, statsResponse.getResponse().results().get(0).getState());
|
||||
});
|
||||
DeleteJobAction.Response response =
|
||||
client.execute(DeleteJobAction.INSTANCE, new DeleteJobAction.Request(jobId)).get();
|
||||
client.execute(DeleteJobAction.INSTANCE, new DeleteJobAction.Request(job.getId())).get();
|
||||
assertTrue(response.isAcknowledged());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -548,10 +548,9 @@
|
|||
- do:
|
||||
headers:
|
||||
Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser
|
||||
cluster.state:
|
||||
metric: [ metadata ]
|
||||
filter_path: metadata.persistent_tasks
|
||||
- match: {"metadata.persistent_tasks.tasks.0.task.xpack/ml/job.status.state": opened}
|
||||
xpack.ml.get_job_stats:
|
||||
job_id: jobs-crud-close-job
|
||||
- match: {"jobs.0.state": opened}
|
||||
|
||||
- do:
|
||||
xpack.ml.close_job:
|
||||
|
@ -561,11 +560,9 @@
|
|||
- do:
|
||||
headers:
|
||||
Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser
|
||||
cluster.state:
|
||||
metric: [ metadata ]
|
||||
filter_path: metadata.persistent_tasks
|
||||
- match:
|
||||
metadata.persistent_tasks.tasks: []
|
||||
xpack.ml.get_job_stats:
|
||||
job_id: jobs-crud-close-job
|
||||
- match: {"jobs.0.state": closed}
|
||||
|
||||
---
|
||||
"Test closing a closed job isn't an error":
|
||||
|
@ -789,10 +786,9 @@
|
|||
- do:
|
||||
headers:
|
||||
Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser
|
||||
cluster.state:
|
||||
metric: [ metadata ]
|
||||
filter_path: metadata.persistent_tasks
|
||||
- match: {"metadata.persistent_tasks.tasks.0.task.xpack/ml/job.status.state": opened}
|
||||
xpack.ml.get_job_stats:
|
||||
job_id: jobs-crud-force-close-job
|
||||
- match: {"jobs.0.state": opened}
|
||||
|
||||
- do:
|
||||
xpack.ml.close_job:
|
||||
|
@ -803,11 +799,9 @@
|
|||
- do:
|
||||
headers:
|
||||
Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser
|
||||
cluster.state:
|
||||
metric: [ metadata ]
|
||||
filter_path: metadata.persistent_tasks
|
||||
- match:
|
||||
metadata.persistent_tasks.tasks: []
|
||||
xpack.ml.get_job_stats:
|
||||
job_id: jobs-crud-force-close-job
|
||||
- match: {"jobs.0.state": closed}
|
||||
|
||||
---
|
||||
"Test force closing a closed job isn't an error":
|
||||
|
|
Loading…
Reference in New Issue