[Ml] Prevent config snapshot failure blocking migration (#37493)

This commit is contained in:
David Kyle 2019-01-16 08:38:46 +00:00
parent 347cbaf0ed
commit 75410dc632
2 changed files with 65 additions and 1 deletions

View File

@ -34,6 +34,7 @@ import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.MlTasks;
@ -460,7 +461,14 @@ public class MlConfigMigrator {
indexResponse -> {
listener.onResponse(indexResponse.getResult() == DocWriteResponse.Result.CREATED);
},
listener::onFailure),
e -> {
if (e instanceof VersionConflictEngineException) {
// the snapshot already exists
listener.onResponse(Boolean.TRUE);
} else {
listener.onFailure(e);
}
}),
client::index
);
}

View File

@ -6,8 +6,11 @@
package org.elasticsearch.xpack.ml.integration;
import org.elasticsearch.Version;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
@ -180,6 +183,59 @@ public class MlConfigMigratorIT extends MlSingleNodeTestCase {
assertEquals("df-1", datafeedsHolder.get().get(0).getId());
}
public void testExistingSnapshotDoesNotBlockMigration() throws InterruptedException {
// index a doc with the same Id as the config snapshot
IndexRequestBuilder indexRequest = client().prepareIndex(AnomalyDetectorsIndex.jobStateIndexName(),
ElasticsearchMappings.DOC_TYPE, "ml-config")
.setSource(Collections.singletonMap("a_field", "a_value"))
.setOpType(DocWriteRequest.OpType.CREATE)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
indexRequest.execute().actionGet();
// define the configs
MlMetadata.Builder mlMetadata = new MlMetadata.Builder();
mlMetadata.putJob(buildJobBuilder("job-foo").build(), false);
MetaData.Builder metaData = MetaData.builder();
RoutingTable.Builder routingTable = RoutingTable.builder();
addMlConfigIndex(metaData, routingTable);
ClusterState clusterState = ClusterState.builder(new ClusterName("_name"))
.metaData(metaData.putCustom(MlMetadata.TYPE, mlMetadata.build()))
.routingTable(routingTable.build())
.build();
doAnswer(invocation -> {
ClusterStateUpdateTask listener = (ClusterStateUpdateTask) invocation.getArguments()[1];
listener.clusterStateProcessed("source", mock(ClusterState.class), mock(ClusterState.class));
return null;
}).when(clusterService).submitStateUpdateTask(eq("remove-migrated-ml-configs"), any());
AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
AtomicReference<Boolean> responseHolder = new AtomicReference<>();
// do the migration
MlConfigMigrator mlConfigMigrator = new MlConfigMigrator(nodeSettings(), client(), clusterService);
// writing the snapshot should fail because the doc already exists
// in which case the migration should continue
blockingCall(actionListener -> mlConfigMigrator.migrateConfigs(clusterState, actionListener),
responseHolder, exceptionHolder);
assertNull(exceptionHolder.get());
assertTrue(responseHolder.get());
// check the jobs have been migrated
AtomicReference<List<Job.Builder>> jobsHolder = new AtomicReference<>();
JobConfigProvider jobConfigProvider = new JobConfigProvider(client());
blockingCall(actionListener -> jobConfigProvider.expandJobs("*", true, true, actionListener),
jobsHolder, exceptionHolder);
assertNull(exceptionHolder.get());
assertThat(jobsHolder.get(), hasSize(1));
assertTrue(jobsHolder.get().get(0).build().getCustomSettings().containsKey(MlConfigMigrator.MIGRATED_FROM_VERSION));
assertEquals("job-foo", jobsHolder.get().get(0).build().getId());
}
public void testMigrateConfigs_GivenLargeNumberOfJobsAndDatafeeds() throws InterruptedException {
int jobCount = randomIntBetween(150, 201);
int datafeedCount = randomIntBetween(150, jobCount);