Verify that the failure reason of analytics process is empty (#48042) (#48071)

This commit is contained in:
Przemysław Witek 2019-10-15 18:33:20 +02:00 committed by GitHub
parent aff0c9babc
commit eaa56344b5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 42 additions and 38 deletions

View File

@ -232,6 +232,10 @@ public class GetDataFrameAnalyticsStatsAction extends ActionType<GetDataFrameAna
return state;
}
public String getFailureReason() {
return failureReason;
}
public List<PhaseProgress> getProgress() {
return progress;
}
@ -340,6 +344,7 @@ public class GetDataFrameAnalyticsStatsAction extends ActionType<GetDataFrameAna
return Objects.equals(id, other.id)
&& Objects.equals(this.state, other.state)
&& Objects.equals(this.failureReason, other.failureReason)
&& Objects.equals(this.progress, other.progress)
&& Objects.equals(this.node, other.node)
&& Objects.equals(this.assignmentExplanation, other.assignmentExplanation);
}

View File

@ -15,7 +15,6 @@ import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
import org.elasticsearch.xpack.core.ml.dataframe.analyses.BoostedTreeParamsTests;
import org.elasticsearch.xpack.core.ml.dataframe.analyses.Classification;
import org.junit.After;
@ -60,7 +59,7 @@ public class ClassificationIT extends MlNativeDataFrameAnalyticsIntegTestCase {
registerAnalytics(config);
putAnalytics(config);
assertState(jobId, DataFrameAnalyticsState.STOPPED);
assertIsStopped(jobId);
assertProgress(jobId, 0, 0, 0, 0);
startAnalytics(jobId);
@ -98,7 +97,7 @@ public class ClassificationIT extends MlNativeDataFrameAnalyticsIntegTestCase {
registerAnalytics(config);
putAnalytics(config);
assertState(jobId, DataFrameAnalyticsState.STOPPED);
assertIsStopped(jobId);
assertProgress(jobId, 0, 0, 0, 0);
startAnalytics(jobId);
@ -141,7 +140,7 @@ public class ClassificationIT extends MlNativeDataFrameAnalyticsIntegTestCase {
registerAnalytics(config);
putAnalytics(config);
assertState(jobId, DataFrameAnalyticsState.STOPPED);
assertIsStopped(jobId);
assertProgress(jobId, 0, 0, 0, 0);
startAnalytics(jobId);
@ -194,7 +193,7 @@ public class ClassificationIT extends MlNativeDataFrameAnalyticsIntegTestCase {
registerAnalytics(config);
putAnalytics(config);
assertState(jobId, DataFrameAnalyticsState.STOPPED);
assertIsStopped(jobId);
assertProgress(jobId, 0, 0, 0, 0);
startAnalytics(jobId);

View File

@ -41,6 +41,7 @@ import java.util.concurrent.TimeUnit;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.startsWith;
/**
@ -119,8 +120,7 @@ abstract class MlNativeDataFrameAnalyticsIntegTestCase extends MlNativeIntegTest
}
protected void waitUntilAnalyticsIsStopped(String id, TimeValue waitTime) throws Exception {
assertBusy(() -> assertThat(getAnalyticsStats(id).get(0).getState(), equalTo(DataFrameAnalyticsState.STOPPED)),
waitTime.getMillis(), TimeUnit.MILLISECONDS);
assertBusy(() -> assertIsStopped(id), waitTime.getMillis(), TimeUnit.MILLISECONDS);
}
protected List<DataFrameAnalyticsConfig> getAnalytics(String id) {
@ -128,11 +128,13 @@ abstract class MlNativeDataFrameAnalyticsIntegTestCase extends MlNativeIntegTest
return client().execute(GetDataFrameAnalyticsAction.INSTANCE, request).actionGet().getResources().results();
}
protected List<GetDataFrameAnalyticsStatsAction.Response.Stats> getAnalyticsStats(String id) {
protected GetDataFrameAnalyticsStatsAction.Response.Stats getAnalyticsStats(String id) {
GetDataFrameAnalyticsStatsAction.Request request = new GetDataFrameAnalyticsStatsAction.Request(id);
GetDataFrameAnalyticsStatsAction.Response response = client().execute(GetDataFrameAnalyticsStatsAction.INSTANCE, request)
.actionGet();
return response.getResponse().results();
List<GetDataFrameAnalyticsStatsAction.Response.Stats> stats = response.getResponse().results();
assertThat("Got: " + stats.toString(), stats.size(), equalTo(1));
return stats.get(0);
}
protected static DataFrameAnalyticsConfig buildAnalytics(String id, String sourceIndex, String destIndex,
@ -145,18 +147,17 @@ abstract class MlNativeDataFrameAnalyticsIntegTestCase extends MlNativeIntegTest
return configBuilder.build();
}
protected void assertState(String id, DataFrameAnalyticsState state) {
List<GetDataFrameAnalyticsStatsAction.Response.Stats> stats = getAnalyticsStats(id);
assertThat(stats.size(), equalTo(1));
assertThat(stats.get(0).getId(), equalTo(id));
assertThat(stats.get(0).getState(), equalTo(state));
protected void assertIsStopped(String id) {
GetDataFrameAnalyticsStatsAction.Response.Stats stats = getAnalyticsStats(id);
assertThat(stats.getId(), equalTo(id));
assertThat(stats.getFailureReason(), is(nullValue()));
assertThat(stats.getState(), equalTo(DataFrameAnalyticsState.STOPPED));
}
protected void assertProgress(String id, int reindexing, int loadingData, int analyzing, int writingResults) {
List<GetDataFrameAnalyticsStatsAction.Response.Stats> stats = getAnalyticsStats(id);
List<PhaseProgress> progress = stats.get(0).getProgress();
assertThat(stats.size(), equalTo(1));
assertThat(stats.get(0).getId(), equalTo(id));
GetDataFrameAnalyticsStatsAction.Response.Stats stats = getAnalyticsStats(id);
assertThat(stats.getId(), equalTo(id));
List<PhaseProgress> progress = stats.getProgress();
assertThat(progress.size(), equalTo(4));
assertThat(progress.get(0).getPhase(), equalTo("reindexing"));
assertThat(progress.get(1).getPhase(), equalTo("loading_data"));

View File

@ -13,7 +13,6 @@ import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
import org.elasticsearch.xpack.core.ml.dataframe.analyses.OutlierDetection;
import org.junit.After;
@ -74,7 +73,7 @@ public class OutlierDetectionWithMissingFieldsIT extends MlNativeDataFrameAnalyt
registerAnalytics(config);
putAnalytics(config);
assertState(id, DataFrameAnalyticsState.STOPPED);
assertIsStopped(id);
assertProgress(id, 0, 0, 0, 0);
startAnalytics(id);

View File

@ -81,7 +81,7 @@ public class RegressionIT extends MlNativeDataFrameAnalyticsIntegTestCase {
registerAnalytics(config);
putAnalytics(config);
assertState(jobId, DataFrameAnalyticsState.STOPPED);
assertIsStopped(jobId);
assertProgress(jobId, 0, 0, 0, 0);
startAnalytics(jobId);
@ -141,7 +141,7 @@ public class RegressionIT extends MlNativeDataFrameAnalyticsIntegTestCase {
registerAnalytics(config);
putAnalytics(config);
assertState(jobId, DataFrameAnalyticsState.STOPPED);
assertIsStopped(jobId);
assertProgress(jobId, 0, 0, 0, 0);
startAnalytics(jobId);
@ -199,7 +199,7 @@ public class RegressionIT extends MlNativeDataFrameAnalyticsIntegTestCase {
registerAnalytics(config);
putAnalytics(config);
assertState(jobId, DataFrameAnalyticsState.STOPPED);
assertIsStopped(jobId);
assertProgress(jobId, 0, 0, 0, 0);
startAnalytics(jobId);
@ -259,14 +259,14 @@ public class RegressionIT extends MlNativeDataFrameAnalyticsIntegTestCase {
registerAnalytics(config);
putAnalytics(config);
assertState(jobId, DataFrameAnalyticsState.STOPPED);
assertIsStopped(jobId);
assertProgress(jobId, 0, 0, 0, 0);
startAnalytics(jobId);
// Wait until state is one of REINDEXING or ANALYZING, or until it is STOPPED.
assertBusy(() -> {
DataFrameAnalyticsState state = getAnalyticsStats(jobId).get(0).getState();
DataFrameAnalyticsState state = getAnalyticsStats(jobId).getState();
assertThat(state, is(anyOf(equalTo(DataFrameAnalyticsState.REINDEXING), equalTo(DataFrameAnalyticsState.ANALYZING),
equalTo(DataFrameAnalyticsState.STOPPED))));
});

View File

@ -80,7 +80,7 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest
registerAnalytics(config);
putAnalytics(config);
assertState(id, DataFrameAnalyticsState.STOPPED);
assertIsStopped(id);
assertProgress(id, 0, 0, 0, 0);
startAnalytics(id);
@ -157,7 +157,7 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest
registerAnalytics(config);
putAnalytics(config);
assertState(id, DataFrameAnalyticsState.STOPPED);
assertIsStopped(id);
assertProgress(id, 0, 0, 0, 0);
startAnalytics(id);
@ -228,7 +228,7 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest
registerAnalytics(config);
putAnalytics(config);
assertState(id, DataFrameAnalyticsState.STOPPED);
assertIsStopped(id);
assertProgress(id, 0, 0, 0, 0);
startAnalytics(id);
@ -293,12 +293,12 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest
registerAnalytics(config);
putAnalytics(config);
assertState(id, DataFrameAnalyticsState.STOPPED);
assertIsStopped(id);
startAnalytics(id);
// State here could be any of STARTED, REINDEXING or ANALYZING
assertThat(stopAnalytics(id).isStopped(), is(true));
assertState(id, DataFrameAnalyticsState.STOPPED);
assertIsStopped(id);
if (indexExists(config.getDest().getIndex()) == false) {
// We stopped before we even created the destination index
return;
@ -363,7 +363,7 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest
registerAnalytics(config);
putAnalytics(config);
assertState(id, DataFrameAnalyticsState.STOPPED);
assertIsStopped(id);
assertProgress(id, 0, 0, 0, 0);
startAnalytics(id);
@ -421,7 +421,7 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest
registerAnalytics(config);
putAnalytics(config);
assertState(id, DataFrameAnalyticsState.STOPPED);
assertIsStopped(id);
assertProgress(id, 0, 0, 0, 0);
startAnalytics(id);
@ -480,7 +480,7 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest
registerAnalytics(config);
putAnalytics(config);
assertState(id, DataFrameAnalyticsState.STOPPED);
assertIsStopped(id);
ElasticsearchStatusException exception = expectThrows(ElasticsearchStatusException.class, () -> startAnalytics(id));
assertThat(exception.status(), equalTo(RestStatus.BAD_REQUEST));
@ -525,14 +525,14 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest
registerAnalytics(config);
putAnalytics(config);
assertState(id, DataFrameAnalyticsState.STOPPED);
assertIsStopped(id);
// Due to lazy start being allowed, this should succeed even though no node currently in the cluster is big enough
startAnalytics(id);
// Wait until state is STARTING, there is no node but there is an assignment explanation.
assertBusy(() -> {
GetDataFrameAnalyticsStatsAction.Response.Stats stats = getAnalyticsStats(id).get(0);
GetDataFrameAnalyticsStatsAction.Response.Stats stats = getAnalyticsStats(id);
assertThat(stats.getState(), equalTo(DataFrameAnalyticsState.STARTING));
assertThat(stats.getNode(), is(nullValue()));
assertThat(stats.getAssignmentExplanation(), containsString("persistent task is awaiting node assignment"));
@ -576,12 +576,12 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest
registerAnalytics(config);
putAnalytics(config);
assertState(id, DataFrameAnalyticsState.STOPPED);
assertIsStopped(id);
startAnalytics(id);
// Wait until state is one of REINDEXING or ANALYZING, or until it is STOPPED.
assertBusy(() -> {
DataFrameAnalyticsState state = getAnalyticsStats(id).get(0).getState();
DataFrameAnalyticsState state = getAnalyticsStats(id).getState();
assertThat(state, is(anyOf(equalTo(DataFrameAnalyticsState.REINDEXING), equalTo(DataFrameAnalyticsState.ANALYZING),
equalTo(DataFrameAnalyticsState.STOPPED))));
});
@ -652,7 +652,7 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest
registerAnalytics(config);
putAnalytics(config);
assertState(id, DataFrameAnalyticsState.STOPPED);
assertIsStopped(id);
assertProgress(id, 0, 0, 0, 0);
startAnalytics(id);