Fix and unmute testSetUpgradeMode_ExistingTaskGetsUnassigned (#55368) (#55452)

This commit is contained in:
Przemysław Witek 2020-04-20 13:29:29 +02:00 committed by GitHub
parent b9da307cd1
commit 7d5f74e964
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 35 additions and 14 deletions

View File

@ -139,6 +139,7 @@ public final class MlIndexAndAlias {
boolean addAlias,
boolean isHiddenAttributeAvailable,
ActionListener<Boolean> listener) {
logger.info("About to create first concrete index [{}] with alias [{}]", index, alias);
CreateIndexRequestBuilder requestBuilder = client.admin()
.indices()
.prepareCreate(index);
@ -175,6 +176,7 @@ public final class MlIndexAndAlias {
String newIndex,
boolean isHiddenAttributeAvailable,
ActionListener<Boolean> listener) {
logger.info("About to move write alias [{}] from index [{}] to index [{}]", alias, currentIndex, newIndex);
IndicesAliasesRequest.AliasActions addNewAliasAction = IndicesAliasesRequest.AliasActions.add().index(newIndex).alias(alias);
if (isHiddenAttributeAvailable) {
addNewAliasAction.isHidden(true);

View File

@ -5,6 +5,8 @@
*/
package org.elasticsearch.xpack.ml.integration;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
@ -502,7 +504,6 @@ public class ClassificationIT extends MlNativeDataFrameAnalyticsIntegTestCase {
assertThat(secondRunTrainingRowsIds, equalTo(firstRunTrainingRowsIds));
}
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/55221")
public void testSetUpgradeMode_ExistingTaskGetsUnassigned() throws Exception {
initialize("classification_set_upgrade_mode");
indexData(sourceIndex, 300, 0, KEYWORD_FIELD);
@ -520,18 +521,33 @@ public class ClassificationIT extends MlNativeDataFrameAnalyticsIntegTestCase {
assertThat(analyticsTaskList(), hasSize(1));
assertThat(analyticsAssignedTaskList(), is(empty()));
assertBusy(() -> {
try {
GetDataFrameAnalyticsStatsAction.Response.Stats analyticsStats = getAnalyticsStats(jobId);
assertThat(analyticsStats.getAssignmentExplanation(), is(equalTo(AWAITING_UPGRADE.getExplanation())));
assertThat(analyticsStats.getNode(), is(nullValue()));
} catch (ElasticsearchException e) {
logger.error(new ParameterizedMessage("[{}] Encountered exception while fetching analytics stats", jobId), e);
fail(e.getDetailedMessage());
}
});
setUpgradeModeTo(false);
assertThat(analyticsTaskList(), hasSize(1));
assertBusy(() -> assertThat(analyticsAssignedTaskList(), hasSize(1)));
analyticsStats = getAnalyticsStats(jobId);
assertBusy(() -> {
try {
GetDataFrameAnalyticsStatsAction.Response.Stats analyticsStats = getAnalyticsStats(jobId);
assertThat(analyticsStats.getAssignmentExplanation(), is(not(equalTo(AWAITING_UPGRADE.getExplanation()))));
} catch (ElasticsearchException e) {
logger.error(new ParameterizedMessage("[{}] Encountered exception while fetching analytics stats", jobId), e);
fail(e.getDetailedMessage());
}
});
waitUntilAnalyticsIsStopped(jobId);
assertProgress(jobId, 100, 100, 100, 100);
}
public void testSetUpgradeMode_NewTaskDoesNotStart() throws Exception {

View File

@ -210,11 +210,15 @@ public class TransportGetDataFrameAnalyticsStatsAction
executeAsyncWithOrigin(client, ML_ORIGIN, MultiSearchAction.INSTANCE, multiSearchRequest, ActionListener.wrap(
multiSearchResponse -> {
for (MultiSearchResponse.Item itemResponse : multiSearchResponse.getResponses()) {
MultiSearchResponse.Item[] itemResponses = multiSearchResponse.getResponses();
for (int i = 0; i < itemResponses.length; ++i) {
MultiSearchResponse.Item itemResponse = itemResponses[i];
if (itemResponse.isFailure()) {
SearchRequest itemRequest = multiSearchRequest.requests().get(i);
logger.error(
new ParameterizedMessage(
"[{}] Item failure encountered during multi search: {}", configId, itemResponse.getFailureMessage()),
"[{}] Item failure encountered during multi search for request [indices={}, source={}]: {}",
configId, itemRequest.indices(), itemRequest.source(), itemResponse.getFailureMessage()),
itemResponse.getFailure());
listener.onFailure(ExceptionsHelper.serverError(itemResponse.getFailureMessage(), itemResponse.getFailure()));
return;

View File

@ -640,9 +640,9 @@ public class TransportStartDataFrameAnalyticsAction
@Override
protected void nodeOperation(AllocatedPersistentTask task, StartDataFrameAnalyticsAction.TaskParams params,
PersistentTaskState state) {
logger.info("[{}] Starting data frame analytics", params.getId());
DataFrameAnalyticsTaskState analyticsTaskState = (DataFrameAnalyticsTaskState) state;
DataFrameAnalyticsState analyticsState = analyticsTaskState == null ? null : analyticsTaskState.getState();
logger.info("[{}] Starting data frame analytics from state [{}]", params.getId(), analyticsState);
// If we are "stopping" there is nothing to do and we should stop
if (DataFrameAnalyticsState.STOPPING.equals(analyticsState)) {

View File

@ -11,7 +11,6 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.Client;
@ -267,10 +266,6 @@ public class AnalyticsProcessManager {
process.writeRecord(headerRecord);
}
private void indexDataCounts(DataCounts dataCounts) {
IndexRequest indexRequest = new IndexRequest(MlStatsIndex.writeAlias());
}
private void restoreState(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig config, @Nullable BytesReference state,
AnalyticsProcess<AnalyticsResult> process) {
if (config.getAnalysis().persistsState() == false) {

View File

@ -105,6 +105,7 @@ public class TrainedModelStatsService {
}
void stop() {
logger.info("About to stop TrainedModelStatsService");
stopped = true;
statsQueue.clear();
@ -115,6 +116,7 @@ public class TrainedModelStatsService {
}
void start() {
logger.info("About to start TrainedModelStatsService");
stopped = false;
scheduledFuture = threadPool.scheduleWithFixedDelay(this::updateStats,
PERSISTENCE_INTERVAL,
@ -126,11 +128,13 @@ public class TrainedModelStatsService {
return;
}
if (verifiedStatsIndexCreated == false) {
logger.info("About to create the stats index as it does not exist yet");
try {
PlainActionFuture<Boolean> listener = new PlainActionFuture<>();
MlStatsIndex.createStatsIndexAndAliasIfNecessary(client, clusterState, indexNameExpressionResolver, listener);
listener.actionGet();
verifiedStatsIndexCreated = true;
logger.info("Created stats index");
} catch (Exception e) {
logger.error("failure creating ml stats index for storing model stats", e);
return;