diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetDataFrameAnalyticsStatsAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetDataFrameAnalyticsStatsAction.java index a894faf5993..80dbf8bed1c 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetDataFrameAnalyticsStatsAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetDataFrameAnalyticsStatsAction.java @@ -232,6 +232,10 @@ public class GetDataFrameAnalyticsStatsAction extends ActionType getProgress() { return progress; } @@ -340,6 +344,7 @@ public class GetDataFrameAnalyticsStatsAction extends ActionType assertThat(getAnalyticsStats(id).get(0).getState(), equalTo(DataFrameAnalyticsState.STOPPED)), - waitTime.getMillis(), TimeUnit.MILLISECONDS); + assertBusy(() -> assertIsStopped(id), waitTime.getMillis(), TimeUnit.MILLISECONDS); } protected List getAnalytics(String id) { @@ -128,11 +128,13 @@ abstract class MlNativeDataFrameAnalyticsIntegTestCase extends MlNativeIntegTest return client().execute(GetDataFrameAnalyticsAction.INSTANCE, request).actionGet().getResources().results(); } - protected List getAnalyticsStats(String id) { + protected GetDataFrameAnalyticsStatsAction.Response.Stats getAnalyticsStats(String id) { GetDataFrameAnalyticsStatsAction.Request request = new GetDataFrameAnalyticsStatsAction.Request(id); GetDataFrameAnalyticsStatsAction.Response response = client().execute(GetDataFrameAnalyticsStatsAction.INSTANCE, request) .actionGet(); - return response.getResponse().results(); + List stats = response.getResponse().results(); + assertThat("Got: " + stats.toString(), stats.size(), equalTo(1)); + return stats.get(0); } protected static DataFrameAnalyticsConfig buildAnalytics(String id, String sourceIndex, String destIndex, @@ -145,18 +147,17 @@ abstract class MlNativeDataFrameAnalyticsIntegTestCase extends MlNativeIntegTest return configBuilder.build(); } - protected void assertState(String id, DataFrameAnalyticsState state) { - List stats = getAnalyticsStats(id); - assertThat(stats.size(), equalTo(1)); - assertThat(stats.get(0).getId(), equalTo(id)); - assertThat(stats.get(0).getState(), equalTo(state)); + protected void assertIsStopped(String id) { + GetDataFrameAnalyticsStatsAction.Response.Stats stats = getAnalyticsStats(id); + assertThat(stats.getId(), equalTo(id)); + assertThat(stats.getFailureReason(), is(nullValue())); + assertThat(stats.getState(), equalTo(DataFrameAnalyticsState.STOPPED)); } protected void assertProgress(String id, int reindexing, int loadingData, int analyzing, int writingResults) { - List stats = getAnalyticsStats(id); - List progress = stats.get(0).getProgress(); - assertThat(stats.size(), equalTo(1)); - assertThat(stats.get(0).getId(), equalTo(id)); + GetDataFrameAnalyticsStatsAction.Response.Stats stats = getAnalyticsStats(id); + assertThat(stats.getId(), equalTo(id)); + List progress = stats.getProgress(); assertThat(progress.size(), equalTo(4)); assertThat(progress.get(0).getPhase(), equalTo("reindexing")); assertThat(progress.get(1).getPhase(), equalTo("loading_data")); diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/OutlierDetectionWithMissingFieldsIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/OutlierDetectionWithMissingFieldsIT.java index 741d74f8374..26ae36be99f 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/OutlierDetectionWithMissingFieldsIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/OutlierDetectionWithMissingFieldsIT.java @@ -13,7 +13,6 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.search.SearchHit; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; -import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState; import org.elasticsearch.xpack.core.ml.dataframe.analyses.OutlierDetection; import org.junit.After; @@ -74,7 +73,7 @@ public class OutlierDetectionWithMissingFieldsIT extends MlNativeDataFrameAnalyt registerAnalytics(config); putAnalytics(config); - assertState(id, DataFrameAnalyticsState.STOPPED); + assertIsStopped(id); assertProgress(id, 0, 0, 0, 0); startAnalytics(id); diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java index d73c5df40e6..7388fe260aa 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java @@ -81,7 +81,7 @@ public class RegressionIT extends MlNativeDataFrameAnalyticsIntegTestCase { registerAnalytics(config); putAnalytics(config); - assertState(jobId, DataFrameAnalyticsState.STOPPED); + assertIsStopped(jobId); assertProgress(jobId, 0, 0, 0, 0); startAnalytics(jobId); @@ -141,7 +141,7 @@ public class RegressionIT extends MlNativeDataFrameAnalyticsIntegTestCase { registerAnalytics(config); putAnalytics(config); - assertState(jobId, DataFrameAnalyticsState.STOPPED); + assertIsStopped(jobId); assertProgress(jobId, 0, 0, 0, 0); startAnalytics(jobId); @@ -199,7 +199,7 @@ public class RegressionIT extends MlNativeDataFrameAnalyticsIntegTestCase { registerAnalytics(config); putAnalytics(config); - assertState(jobId, DataFrameAnalyticsState.STOPPED); + assertIsStopped(jobId); assertProgress(jobId, 0, 0, 0, 0); startAnalytics(jobId); @@ -259,14 +259,14 @@ public class RegressionIT extends MlNativeDataFrameAnalyticsIntegTestCase { registerAnalytics(config); putAnalytics(config); - assertState(jobId, DataFrameAnalyticsState.STOPPED); + assertIsStopped(jobId); assertProgress(jobId, 0, 0, 0, 0); startAnalytics(jobId); // Wait until state is one of REINDEXING or ANALYZING, or until it is STOPPED. assertBusy(() -> { - DataFrameAnalyticsState state = getAnalyticsStats(jobId).get(0).getState(); + DataFrameAnalyticsState state = getAnalyticsStats(jobId).getState(); assertThat(state, is(anyOf(equalTo(DataFrameAnalyticsState.REINDEXING), equalTo(DataFrameAnalyticsState.ANALYZING), equalTo(DataFrameAnalyticsState.STOPPED)))); }); diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RunDataFrameAnalyticsIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RunDataFrameAnalyticsIT.java index 6a04afc0a06..f273cb5f1a0 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RunDataFrameAnalyticsIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RunDataFrameAnalyticsIT.java @@ -80,7 +80,7 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest registerAnalytics(config); putAnalytics(config); - assertState(id, DataFrameAnalyticsState.STOPPED); + assertIsStopped(id); assertProgress(id, 0, 0, 0, 0); startAnalytics(id); @@ -157,7 +157,7 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest registerAnalytics(config); putAnalytics(config); - assertState(id, DataFrameAnalyticsState.STOPPED); + assertIsStopped(id); assertProgress(id, 0, 0, 0, 0); startAnalytics(id); @@ -228,7 +228,7 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest registerAnalytics(config); putAnalytics(config); - assertState(id, DataFrameAnalyticsState.STOPPED); + assertIsStopped(id); assertProgress(id, 0, 0, 0, 0); startAnalytics(id); @@ -293,12 +293,12 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest registerAnalytics(config); putAnalytics(config); - assertState(id, DataFrameAnalyticsState.STOPPED); + assertIsStopped(id); startAnalytics(id); // State here could be any of STARTED, REINDEXING or ANALYZING assertThat(stopAnalytics(id).isStopped(), is(true)); - assertState(id, DataFrameAnalyticsState.STOPPED); + assertIsStopped(id); if (indexExists(config.getDest().getIndex()) == false) { // We stopped before we even created the destination index return; @@ -363,7 +363,7 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest registerAnalytics(config); putAnalytics(config); - assertState(id, DataFrameAnalyticsState.STOPPED); + assertIsStopped(id); assertProgress(id, 0, 0, 0, 0); startAnalytics(id); @@ -421,7 +421,7 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest registerAnalytics(config); putAnalytics(config); - assertState(id, DataFrameAnalyticsState.STOPPED); + assertIsStopped(id); assertProgress(id, 0, 0, 0, 0); startAnalytics(id); @@ -480,7 +480,7 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest registerAnalytics(config); putAnalytics(config); - assertState(id, DataFrameAnalyticsState.STOPPED); + assertIsStopped(id); ElasticsearchStatusException exception = expectThrows(ElasticsearchStatusException.class, () -> startAnalytics(id)); assertThat(exception.status(), equalTo(RestStatus.BAD_REQUEST)); @@ -525,14 +525,14 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest registerAnalytics(config); putAnalytics(config); - assertState(id, DataFrameAnalyticsState.STOPPED); + assertIsStopped(id); // Due to lazy start being allowed, this should succeed even though no node currently in the cluster is big enough startAnalytics(id); // Wait until state is STARTING, there is no node but there is an assignment explanation. assertBusy(() -> { - GetDataFrameAnalyticsStatsAction.Response.Stats stats = getAnalyticsStats(id).get(0); + GetDataFrameAnalyticsStatsAction.Response.Stats stats = getAnalyticsStats(id); assertThat(stats.getState(), equalTo(DataFrameAnalyticsState.STARTING)); assertThat(stats.getNode(), is(nullValue())); assertThat(stats.getAssignmentExplanation(), containsString("persistent task is awaiting node assignment")); @@ -576,12 +576,12 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest registerAnalytics(config); putAnalytics(config); - assertState(id, DataFrameAnalyticsState.STOPPED); + assertIsStopped(id); startAnalytics(id); // Wait until state is one of REINDEXING or ANALYZING, or until it is STOPPED. assertBusy(() -> { - DataFrameAnalyticsState state = getAnalyticsStats(id).get(0).getState(); + DataFrameAnalyticsState state = getAnalyticsStats(id).getState(); assertThat(state, is(anyOf(equalTo(DataFrameAnalyticsState.REINDEXING), equalTo(DataFrameAnalyticsState.ANALYZING), equalTo(DataFrameAnalyticsState.STOPPED)))); }); @@ -652,7 +652,7 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest registerAnalytics(config); putAnalytics(config); - assertState(id, DataFrameAnalyticsState.STOPPED); + assertIsStopped(id); assertProgress(id, 0, 0, 0, 0); startAnalytics(id);