ML: removing unnecessary upgrade code (#37879)

This commit is contained in:
Benjamin Trent 2019-01-25 13:57:41 -06:00 committed by GitHub
parent 5cd4dfb0e4
commit 9e932f4869
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 3 additions and 1626 deletions

View File

@ -107,7 +107,6 @@ import org.elasticsearch.xpack.core.ml.action.PutCalendarAction;
import org.elasticsearch.xpack.core.ml.action.PutDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.PutFilterAction;
import org.elasticsearch.xpack.core.ml.action.PutJobAction;
import org.elasticsearch.xpack.core.ml.action.MlUpgradeAction;
import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction;
import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction;
@ -292,7 +291,6 @@ public class XPackClientPlugin extends Plugin implements ActionPlugin, NetworkPl
PostCalendarEventsAction.INSTANCE,
PersistJobAction.INSTANCE,
FindFileStructureAction.INSTANCE,
MlUpgradeAction.INSTANCE,
// security
ClearRealmCacheAction.INSTANCE,
ClearRolesCacheAction.INSTANCE,

View File

@ -1,160 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.ml.action;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.MasterNodeReadOperationRequestBuilder;
import org.elasticsearch.action.support.master.MasterNodeReadRequest;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFields;
import java.io.IOException;
import java.util.Map;
import java.util.Objects;
public class MlUpgradeAction extends Action<AcknowledgedResponse> {
public static final MlUpgradeAction INSTANCE = new MlUpgradeAction();
public static final String NAME = "cluster:admin/xpack/ml/upgrade";
private MlUpgradeAction() {
super(NAME);
}
@Override
public AcknowledgedResponse newResponse() {
return new AcknowledgedResponse();
}
public static class Request extends MasterNodeReadRequest<Request> implements ToXContentObject {
private static final ParseField REINDEX_BATCH_SIZE = new ParseField("reindex_batch_size");
public static ObjectParser<Request, Void> PARSER = new ObjectParser<>("ml_upgrade", true, Request::new);
static {
PARSER.declareInt(Request::setReindexBatchSize, REINDEX_BATCH_SIZE);
}
static final String INDEX = AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + "*";
private int reindexBatchSize = 1000;
/**
* Should this task store its result?
*/
private boolean shouldStoreResult;
// for serialization
public Request() {
}
public Request(StreamInput in) throws IOException {
super(in);
reindexBatchSize = in.readInt();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeInt(reindexBatchSize);
}
public String[] indices() {
return new String[]{INDEX};
}
public IndicesOptions indicesOptions() {
return IndicesOptions.strictExpandOpenAndForbidClosed();
}
/**
* Should this task store its result after it has finished?
*/
public Request setShouldStoreResult(boolean shouldStoreResult) {
this.shouldStoreResult = shouldStoreResult;
return this;
}
@Override
public boolean getShouldStoreResult() {
return shouldStoreResult;
}
public Request setReindexBatchSize(int reindexBatchSize) {
this.reindexBatchSize = reindexBatchSize;
return this;
}
public int getReindexBatchSize() {
return reindexBatchSize;
}
@Override
public ActionRequestValidationException validate() {
if (reindexBatchSize <= 0) {
ActionRequestValidationException validationException = new ActionRequestValidationException();
validationException.addValidationError("["+ REINDEX_BATCH_SIZE.getPreferredName()+"] must be greater than 0.");
return validationException;
}
return null;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Request request = (Request) o;
return Objects.equals(reindexBatchSize, request.reindexBatchSize);
}
@Override
public int hashCode() {
return Objects.hash(reindexBatchSize);
}
@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new CancellableTask(id, type, action, "ml-upgrade", parentTaskId, headers) {
@Override
public boolean shouldCancelChildrenOnCancellation() {
return true;
}
};
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(REINDEX_BATCH_SIZE.getPreferredName(), reindexBatchSize);
builder.endObject();
return builder;
}
}
public static class RequestBuilder extends MasterNodeReadOperationRequestBuilder<Request, AcknowledgedResponse, RequestBuilder> {
public RequestBuilder(ElasticsearchClient client) {
super(client, INSTANCE, new Request());
}
}
}

View File

@ -1,28 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.ml.action;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
public class MlUpgradeRequestTests extends AbstractWireSerializingTestCase<MlUpgradeAction.Request> {
@Override
protected MlUpgradeAction.Request createTestInstance() {
MlUpgradeAction.Request request = new MlUpgradeAction.Request();
if (randomBoolean()) {
request.setReindexBatchSize(randomIntBetween(1, 10_000));
}
return request;
}
@Override
protected Writeable.Reader<MlUpgradeAction.Request> instanceReader() {
return MlUpgradeAction.Request::new;
}
}

View File

@ -93,9 +93,7 @@ integTestRunner {
'ml/validate/Test job config that is invalid only because of the job ID',
'ml/validate_detector/Test invalid detector',
'ml/delete_forecast/Test delete on _all forecasts not allow no forecasts',
'ml/delete_forecast/Test delete forecast on missing forecast',
'ml/ml_upgrade/Upgrade results when there is nothing to upgrade',
'ml/ml_upgrade/Upgrade results when there is nothing to upgrade not waiting for results'
'ml/delete_forecast/Test delete forecast on missing forecast'
].join(',')
}

View File

@ -1,378 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.integration;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.ReindexAction;
import org.elasticsearch.index.reindex.ReindexRequest;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ml.action.MlUpgradeAction;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFields;
import org.elasticsearch.xpack.ml.ResultsIndexUpgradeService;
import org.junit.After;
import org.junit.Assert;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createDatafeedBuilder;
import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createScheduledJob;
import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.indexDocs;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.core.Is.is;
public class MlUpgradeIT extends MlNativeAutodetectIntegTestCase {
@After
public void cleanup() throws Exception {
cleanUp();
}
public void testMigrationWhenItIsNotNecessary() throws Exception {
String jobId1 = "no-migration-test1";
String jobId2 = "no-migration-test2";
String jobId3 = "no-migration-test3";
String dataIndex = createDataIndex().v2();
List<Job> jobs = createJobsWithData(jobId1, jobId2, jobId3, dataIndex);
Job job1 = jobs.get(0);
Job job2 = jobs.get(1);
Job job3 = jobs.get(2);
String job1Index = job1.getResultsIndexName();
String job2Index = job2.getResultsIndexName();
String job3Index = job3.getResultsIndexName();
assertThat(indexExists(job1Index), is(true));
assertThat(indexExists(job2Index), is(true));
assertThat(indexExists(job3Index), is(true));
long job1Total = getTotalDocCount(job1Index);
long job2Total = getTotalDocCount(job2Index);
long job3Total = getTotalDocCount(job3Index);
AcknowledgedResponse resp = ESIntegTestCase.client().execute(MlUpgradeAction.INSTANCE,
new MlUpgradeAction.Request()).actionGet();
assertThat(resp.isAcknowledged(), is(true));
// Migration should have done nothing
assertThat(indexExists(job1Index), is(true));
assertThat(indexExists(job2Index), is(true));
assertThat(indexExists(job3Index), is(true));
assertThat(getTotalDocCount(job1Index), equalTo(job1Total));
assertThat(getTotalDocCount(job2Index), equalTo(job2Total));
assertThat(getTotalDocCount(job3Index), equalTo(job3Total));
ClusterState state = admin().cluster().state(new ClusterStateRequest()).actionGet().getState();
IndexNameExpressionResolver indexNameExpressionResolver = new IndexNameExpressionResolver();
String[] indices = indexNameExpressionResolver.concreteIndexNames(state,
IndicesOptions.strictExpandOpenAndForbidClosed(),
AnomalyDetectorsIndex.jobResultsIndexPrefix() + "*");
// Our backing index size should be two as we have a shared and custom index
assertThat(indices.length, equalTo(2));
}
public void testMigration() throws Exception {
String jobId1 = "migration-test1";
String jobId2 = "migration-test2";
String jobId3 = "migration-test3";
String dataIndex = createDataIndex().v2();
List<Job> jobs = createJobsWithData(jobId1, jobId2, jobId3, dataIndex);
Job job1 = jobs.get(0);
Job job2 = jobs.get(1);
Job job3 = jobs.get(2);
String job1Index = job1.getResultsIndexName();
String job2Index = job2.getResultsIndexName();
String job3Index = job3.getResultsIndexName();
assertThat(indexExists(job1Index), is(true));
assertThat(indexExists(job2Index), is(true));
assertThat(indexExists(job3Index), is(true));
long job1Total = getJobResultsCount(job1.getId());
long job2Total = getJobResultsCount(job2.getId());
long job3Total = getJobResultsCount(job3.getId());
IndexNameExpressionResolver indexNameExpressionResolver = new IndexNameExpressionResolver();
ResultsIndexUpgradeService resultsIndexUpgradeService = new ResultsIndexUpgradeService(indexNameExpressionResolver,
ThreadPool.Names.SAME,
indexMetaData -> true);
PlainActionFuture<AcknowledgedResponse> future = PlainActionFuture.newFuture();
resultsIndexUpgradeService.upgrade(ESIntegTestCase.client(),
new MlUpgradeAction.Request(),
ESIntegTestCase.client().admin().cluster().prepareState().get().getState(),
future);
AcknowledgedResponse response = future.get();
assertThat(response.isAcknowledged(), is(true));
assertThat(indexExists(job1Index), is(false));
assertThat(indexExists(job2Index), is(false));
assertThat(indexExists(job3Index), is(false));
ClusterState state = admin().cluster().state(new ClusterStateRequest()).actionGet().getState();
String[] indices = indexNameExpressionResolver.concreteIndexNames(state,
IndicesOptions.strictExpandOpenAndForbidClosed(),
AnomalyDetectorsIndex.jobResultsIndexPrefix() + "*");
// Our backing index size should be four as we have a shared and custom index and upgrading doubles the number of indices
Assert.assertThat(indices.length, equalTo(4));
refresh(indices);
assertThat(getJobResultsCount(job1.getId()), equalTo(job1Total));
assertThat(getJobResultsCount(job2.getId()), equalTo(job2Total));
assertThat(getJobResultsCount(job3.getId()), equalTo(job3Total));
// WE should still be able to write, and the aliases should allow to read from the appropriate indices
postDataToJob(jobId1);
postDataToJob(jobId2);
postDataToJob(jobId3);
// We should also be able to create new jobs and old jobs should be unaffected.
String jobId4 = "migration-test4";
Job job4 = createAndOpenJobAndStartDataFeedWithData(jobId4, dataIndex, false);
waitUntilJobIsClosed(jobId4);
indices = indexNameExpressionResolver.concreteIndexNames(state,
IndicesOptions.strictExpandOpenAndForbidClosed(),
AnomalyDetectorsIndex.jobResultsIndexPrefix() + "*");
refresh(indices);
long newJob1Total = getJobResultsCount(job1.getId());
assertThat(newJob1Total, greaterThan(job1Total));
long newJob2Total = getJobResultsCount(job2.getId());
assertThat(newJob2Total, greaterThan(job2Total));
long newJob3Total = getJobResultsCount(job3.getId());
assertThat(newJob3Total, greaterThan(job3Total));
assertThat(getJobResultsCount(jobId4), greaterThan(0L));
assertThat(getJobResultsCount(jobId1), equalTo(newJob1Total));
assertThat(getJobResultsCount(jobId2), equalTo(newJob2Total));
assertThat(getJobResultsCount(jobId3), equalTo(newJob3Total));
}
//I think this test name could be a little bit longer....
public void testMigrationWithManuallyCreatedIndexThatNeedsMigrating() throws Exception {
String jobId1 = "migration-failure-test1";
String jobId2 = "migration-failure-test2";
String jobId3 = "migration-failure-test3";
String dataIndex = createDataIndex().v2();
List<Job> jobs = createJobsWithData(jobId1, jobId2, jobId3, dataIndex);
Job job1 = jobs.get(0);
Job job2 = jobs.get(1);
Job job3 = jobs.get(2);
String job1Index = job1.getResultsIndexName();
String job2Index = job2.getResultsIndexName();
String job3Index = job3.getResultsIndexName();
// This index name should match one of the automatically created migration indices
String manuallyCreatedIndex = job1Index + "-" + Version.CURRENT.major;
client().admin().indices().prepareCreate(manuallyCreatedIndex).execute().actionGet();
IndexNameExpressionResolver indexNameExpressionResolver = new IndexNameExpressionResolver();
ResultsIndexUpgradeService resultsIndexUpgradeService = new ResultsIndexUpgradeService(indexNameExpressionResolver,
ThreadPool.Names.SAME,
indexMetaData -> true); //indicates that this manually created index needs migrated
resultsIndexUpgradeService.upgrade(ESIntegTestCase.client(),
new MlUpgradeAction.Request(),
ESIntegTestCase.client().admin().cluster().prepareState().get().getState(),
ActionListener.wrap(
resp -> fail(),
exception -> {
assertThat(exception, instanceOf(IllegalStateException.class));
assertThat(exception.getMessage(),
equalTo("Index [" + manuallyCreatedIndex + "] already exists and is not the current version."));
}
));
}
public void testMigrationWithExistingIndexWithData() throws Exception {
String jobId1 = "partial-migration-test1";
String jobId2 = "partial-migration-test2";
String jobId3 = "partial-migration-test3";
String dataIndex = createDataIndex().v2();
List<Job> jobs = createJobsWithData(jobId1, jobId2, jobId3, dataIndex);
Job job1 = jobs.get(0);
Job job2 = jobs.get(1);
Job job3 = jobs.get(2);
String job1Index = job1.getResultsIndexName();
String job2Index = job2.getResultsIndexName();
String job3Index = job3.getResultsIndexName();
assertThat(indexExists(job1Index), is(true));
assertThat(indexExists(job2Index), is(true));
assertThat(indexExists(job3Index), is(true));
long job1Total = getJobResultsCount(job1.getId());
long job2Total = getJobResultsCount(job2.getId());
long job3Total = getJobResultsCount(job3.getId());
//lets manually create a READ index with reindexed data already
// Should still get aliased appropriately without any additional/duplicate data.
String alreadyMigratedIndex = job1Index + "-" + Version.CURRENT.major + "r";
ReindexRequest reindexRequest = new ReindexRequest();
reindexRequest.setSourceIndices(job1Index);
reindexRequest.setDestIndex(alreadyMigratedIndex);
client().execute(ReindexAction.INSTANCE, reindexRequest).actionGet();
//New write index as well, should still get aliased appropriately
String alreadyMigratedWriteIndex = job1Index + "-" + Version.CURRENT.major;
client().admin().indices().prepareCreate(alreadyMigratedWriteIndex).execute().actionGet();
IndexNameExpressionResolver indexNameExpressionResolver = new IndexNameExpressionResolver();
ResultsIndexUpgradeService resultsIndexUpgradeService = new ResultsIndexUpgradeService(indexNameExpressionResolver,
ThreadPool.Names.SAME,
//indicates that this manually created index is already migrated and should not be included in our migration steps
indexMetaData -> !(indexMetaData.getIndex().getName().equals(alreadyMigratedIndex) ||
indexMetaData.getIndex().getName().equals(alreadyMigratedWriteIndex)));
PlainActionFuture<AcknowledgedResponse> future = PlainActionFuture.newFuture();
resultsIndexUpgradeService.upgrade(ESIntegTestCase.client(),
new MlUpgradeAction.Request(),
ESIntegTestCase.client().admin().cluster().prepareState().get().getState(),
future);
AcknowledgedResponse response = future.get();
assertThat(response.isAcknowledged(), is(true));
assertThat(indexExists(job1Index), is(false));
assertThat(indexExists(job2Index), is(false));
assertThat(indexExists(job3Index), is(false));
ClusterState state = admin().cluster().state(new ClusterStateRequest()).actionGet().getState();
String[] indices = indexNameExpressionResolver.concreteIndexNames(state,
IndicesOptions.strictExpandOpenAndForbidClosed(),
AnomalyDetectorsIndex.jobResultsIndexPrefix() + "*");
// Our backing index size should be four as we have a shared and custom index and upgrading doubles the number of indices
Assert.assertThat(indices.length, equalTo(4));
refresh(indices);
assertThat(getJobResultsCount(job1.getId()), equalTo(job1Total));
assertThat(getJobResultsCount(job2.getId()), equalTo(job2Total));
assertThat(getJobResultsCount(job3.getId()), equalTo(job3Total));
// WE should still be able to write, and the aliases should allow to read from the appropriate indices
postDataToJob(jobId1);
postDataToJob(jobId2);
postDataToJob(jobId3);
refresh(indices);
long newJob1Total = getJobResultsCount(job1.getId());
assertThat(newJob1Total, greaterThan(job1Total));
long newJob2Total = getJobResultsCount(job2.getId());
assertThat(newJob2Total, greaterThan(job2Total));
long newJob3Total = getJobResultsCount(job3.getId());
assertThat(newJob3Total, greaterThan(job3Total));
}
private long getTotalDocCount(String indexName) {
SearchResponse searchResponse = ESIntegTestCase.client().prepareSearch(indexName)
.setSize(10_000)
.setTrackTotalHits(true)
.setQuery(QueryBuilders.matchAllQuery())
.execute().actionGet();
return searchResponse.getHits().getTotalHits().value;
}
private long getJobResultsCount(String jobId) {
String index = AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + jobId;
return getTotalDocCount(index);
}
private void postDataToJob(String jobId) throws Exception {
openJob(jobId);
ESTestCase.assertBusy(() -> Assert.assertEquals(getJobStats(jobId).get(0).getState(), JobState.OPENED));
startDatafeed(jobId + "-datafeed", 0L, System.currentTimeMillis());
waitUntilJobIsClosed(jobId);
}
private Job createAndOpenJobAndStartDataFeedWithData(String jobId, String dataIndex, boolean isCustom) throws Exception {
Job.Builder jobbuilder = createScheduledJob(jobId);
if (isCustom) {
jobbuilder.setResultsIndexName(jobId);
}
registerJob(jobbuilder);
Job job = putJob(jobbuilder).getResponse();
openJob(job.getId());
ESTestCase.assertBusy(() -> Assert.assertEquals(getJobStats(job.getId()).get(0).getState(), JobState.OPENED));
DatafeedConfig.Builder builder = createDatafeedBuilder(job.getId() + "-datafeed",
job.getId(),
Collections.singletonList(dataIndex));
builder.setQueryDelay(TimeValue.timeValueSeconds(5));
builder.setFrequency(TimeValue.timeValueSeconds(5));
DatafeedConfig datafeedConfig = builder.build();
registerDatafeed(datafeedConfig);
putDatafeed(datafeedConfig);
startDatafeed(datafeedConfig.getId(), 0L, System.currentTimeMillis());
waitUntilJobIsClosed(jobId);
return job;
}
private Tuple<Long, String> createDataIndex() {
ESIntegTestCase.client().admin().indices().prepareCreate("data-for-migration-1")
.addMapping("type", "time", "type=date")
.get();
long numDocs = ESTestCase.randomIntBetween(32, 512);
long now = System.currentTimeMillis();
long oneWeekAgo = now - 604800000;
long twoWeeksAgo = oneWeekAgo - 604800000;
indexDocs(logger, "data-for-migration-1", numDocs, twoWeeksAgo, oneWeekAgo);
return new Tuple<>(numDocs, "data-for-migration-1");
}
private List<Job> createJobsWithData(String sharedJobId1, String sharedJobId2, String customJobId, String dataIndex) throws Exception {
Job job1 = createAndOpenJobAndStartDataFeedWithData(sharedJobId1, dataIndex, false);
Job job2 = createAndOpenJobAndStartDataFeedWithData(sharedJobId2, dataIndex, false);
Job job3 = createAndOpenJobAndStartDataFeedWithData(customJobId, dataIndex, true);
return Arrays.asList(job1, job2, job3);
}
}

View File

@ -96,7 +96,6 @@ import org.elasticsearch.xpack.core.ml.action.PutCalendarAction;
import org.elasticsearch.xpack.core.ml.action.PutDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.PutFilterAction;
import org.elasticsearch.xpack.core.ml.action.PutJobAction;
import org.elasticsearch.xpack.core.ml.action.MlUpgradeAction;
import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction;
import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction;
@ -152,7 +151,6 @@ import org.elasticsearch.xpack.ml.action.TransportPutCalendarAction;
import org.elasticsearch.xpack.ml.action.TransportPutDatafeedAction;
import org.elasticsearch.xpack.ml.action.TransportPutFilterAction;
import org.elasticsearch.xpack.ml.action.TransportPutJobAction;
import org.elasticsearch.xpack.ml.action.TransportMlUpgradeAction;
import org.elasticsearch.xpack.ml.action.TransportRevertModelSnapshotAction;
import org.elasticsearch.xpack.ml.action.TransportStartDatafeedAction;
import org.elasticsearch.xpack.ml.action.TransportStopDatafeedAction;
@ -232,7 +230,6 @@ import org.elasticsearch.xpack.ml.rest.results.RestGetCategoriesAction;
import org.elasticsearch.xpack.ml.rest.results.RestGetInfluencersAction;
import org.elasticsearch.xpack.ml.rest.results.RestGetOverallBucketsAction;
import org.elasticsearch.xpack.ml.rest.results.RestGetRecordsAction;
import org.elasticsearch.xpack.ml.rest.results.RestUpgradeMlAction;
import org.elasticsearch.xpack.ml.rest.validate.RestValidateDetectorAction;
import org.elasticsearch.xpack.ml.rest.validate.RestValidateJobConfigAction;
@ -545,8 +542,7 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
new RestPutCalendarJobAction(settings, restController),
new RestGetCalendarEventsAction(settings, restController),
new RestPostCalendarEventAction(settings, restController),
new RestFindFileStructureAction(settings, restController),
new RestUpgradeMlAction(settings, restController)
new RestFindFileStructureAction(settings, restController)
);
}
@ -604,8 +600,7 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
new ActionHandler<>(GetCalendarEventsAction.INSTANCE, TransportGetCalendarEventsAction.class),
new ActionHandler<>(PostCalendarEventsAction.INSTANCE, TransportPostCalendarEventsAction.class),
new ActionHandler<>(PersistJobAction.INSTANCE, TransportPersistJobAction.class),
new ActionHandler<>(FindFileStructureAction.INSTANCE, TransportFindFileStructureAction.class),
new ActionHandler<>(MlUpgradeAction.INSTANCE, TransportMlUpgradeAction.class)
new ActionHandler<>(FindFileStructureAction.INSTANCE, TransportFindFileStructureAction.class)
);
}
@Override

View File

@ -1,513 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.ReindexAction;
import org.elasticsearch.index.reindex.ReindexRequest;
import org.elasticsearch.index.reindex.ScrollableHitSource;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xpack.core.ml.action.MlUpgradeAction;
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
import org.elasticsearch.xpack.ml.utils.TypedChainTaskExecutor;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
/**
* ML Job results index upgrade service
*/
public class ResultsIndexUpgradeService {
private static final Logger logger = LogManager.getLogger(ResultsIndexUpgradeService.class);
// Adjust the following constants as necessary for various versions and backports.
private static final int INDEX_VERSION = Version.CURRENT.major;
private static final Version MIN_REQUIRED_VERSION = Version.CURRENT.minimumCompatibilityVersion();
private final IndexNameExpressionResolver indexNameExpressionResolver;
private final Predicate<IndexMetaData> shouldUpgrade;
private final String executor;
/**
* Construct a new upgrade service
*
* @param indexNameExpressionResolver Index expression resolver for the request
* @param executor Where to execute client calls
* @param shouldUpgrade Given IndexMetadata indicate if it should be upgraded or not
* {@code true} indicates that it SHOULD upgrade
*/
public ResultsIndexUpgradeService(IndexNameExpressionResolver indexNameExpressionResolver,
String executor,
Predicate<IndexMetaData> shouldUpgrade) {
this.indexNameExpressionResolver = indexNameExpressionResolver;
this.shouldUpgrade = shouldUpgrade;
this.executor = executor;
}
public static boolean wasIndexCreatedInCurrentMajorVersion(IndexMetaData indexMetaData) {
return indexMetaData.getCreationVersion().major == INDEX_VERSION;
}
/**
* There are two reasons for these indices to exist:
* 1. The upgrade process has ran before and either failed for some reason, or the end user is simply running it again.
* Either way, it should be ok to proceed as this action SHOULD be idempotent,
* unless the shouldUpgrade predicate is poorly formed
* 2. This index was created manually by the user. If the index was created manually and actually needs upgrading, then
* we consider the "new index" to be invalid as the passed predicate indicates that it still needs upgrading.
*
* @param metaData Cluster metadata
* @param newIndexName The index to check
* @param shouldUpgrade Should be index be upgraded
* @return {@code true} if the "new index" is valid
*/
private static boolean validNewIndex(MetaData metaData, String newIndexName, Predicate<IndexMetaData> shouldUpgrade) {
return (metaData.hasIndex(newIndexName) && shouldUpgrade.test(metaData.index(newIndexName))) == false;
}
private static void validateMinNodeVersion(ClusterState clusterState) {
if (clusterState.nodes().getMinNodeVersion().before(MIN_REQUIRED_VERSION)) {
throw new IllegalStateException("All nodes should have at least version [" + MIN_REQUIRED_VERSION + "] to upgrade");
}
}
// This method copies the behavior of the normal {index}/_upgrade rest response handler
private static Tuple<RestStatus, Throwable> getStatusAndCause(BulkByScrollResponse response) {
/*
* Return the highest numbered rest status under the assumption that higher numbered statuses are "more error"
* and thus more interesting to the user.
*/
RestStatus status = RestStatus.OK;
Throwable cause = null;
if (response.isTimedOut()) {
status = RestStatus.REQUEST_TIMEOUT;
cause = new ElasticsearchTimeoutException("Reindex request timed out");
}
for (BulkItemResponse.Failure failure : response.getBulkFailures()) {
if (failure.getStatus().getStatus() > status.getStatus()) {
status = failure.getStatus();
cause = failure.getCause();
}
}
for (ScrollableHitSource.SearchFailure failure : response.getSearchFailures()) {
RestStatus failureStatus = ExceptionsHelper.status(failure.getReason());
if (failureStatus.getStatus() > status.getStatus()) {
status = failureStatus;
cause = failure.getReason();
}
}
return new Tuple<>(status, cause);
}
/**
* Upgrade the indices given in the request.
*
* @param client The client to use when making calls
* @param request The upgrade request
* @param state The current cluster state
* @param listener The listener to alert when actions have completed
*/
public void upgrade(Client client, MlUpgradeAction.Request request, ClusterState state,
ActionListener<AcknowledgedResponse> listener) {
try {
validateMinNodeVersion(state);
String[] concreteIndices = indexNameExpressionResolver.concreteIndexNames(state, request.indicesOptions(), request.indices());
MetaData metaData = state.getMetaData();
List<String> indicesToUpgrade = Arrays.stream(concreteIndices)
.filter(indexName -> shouldUpgrade.test(metaData.index(indexName)))
.collect(Collectors.toList());
// All the internal indices are up to date
if (indicesToUpgrade.isEmpty()) {
listener.onResponse(new AcknowledgedResponse(true));
return;
}
IndexNameAndAliasProvider indexNameAndAliasProvider = new IndexNameAndAliasProvider(indicesToUpgrade, metaData);
Exception validationException = indexNameAndAliasProvider.validate(metaData, shouldUpgrade);
if (validationException != null) {
listener.onFailure(validationException);
return;
}
// <7> Now that we have deleted the old indices, we are complete, alert the user
ActionListener<AcknowledgedResponse> deleteIndicesListener = ActionListener.wrap(
listener::onResponse,
error -> {
String msg = "Failed to delete old indices: " + Strings.collectionToCommaDelimitedString(indicesToUpgrade);
logger.error(msg, error);
listener.onFailure(new ElasticsearchException(msg, error));
}
);
// <6> Now that aliases are moved, need to delete the old indices
ActionListener<AcknowledgedResponse> readAliasListener = ActionListener.wrap(
resp -> deleteOldIndices(client, indicesToUpgrade, deleteIndicesListener),
error -> {
String msg = "Failed adjusting aliases from old indices to new.";
logger.error(msg, error);
listener.onFailure(new ElasticsearchException(msg, error));
}
);
// <5> Documents are now reindexed, time to move read aliases
ActionListener<Boolean> reindexListener = ActionListener.wrap(
resp ->
// Need to make indices writable again so that the aliases can be removed from them
removeReadOnlyBlock(client, indicesToUpgrade,
ActionListener.wrap(
rrob -> adjustAliases(client,
indexNameAndAliasProvider.oldIndicesWithReadAliases(),
indexNameAndAliasProvider.newReadIndicesWithReadAliases(),
readAliasListener),
rrobFailure -> {
String msg = "Failed making old indices writable again so that aliases can be moved.";
logger.error(msg, rrobFailure);
listener.onFailure(new ElasticsearchException(msg, rrobFailure));
})
),
error -> {
logger.error("Failed to reindex old read-only indices", error);
removeReadOnlyBlock(client, indicesToUpgrade, ActionListener.wrap(
empty -> listener.onFailure(error),
removeReadOnlyBlockError -> {
String msg = "Failed making old indices read/write again after failing to reindex: " + error.getMessage();
logger.error(msg, removeReadOnlyBlockError);
listener.onFailure(new ElasticsearchException(msg, removeReadOnlyBlockError));
}
));
}
);
// <4> Old indexes are now readOnly, Time to reindex
ActionListener<AcknowledgedResponse> readOnlyListener = ActionListener.wrap(
ack -> reindexOldReadIndicesToNewIndices(client, indexNameAndAliasProvider.needsReindex(), request, reindexListener),
listener::onFailure
);
// <3> Set old indices to readOnly
ActionListener<AcknowledgedResponse> writeAliasesMovedListener = ActionListener.wrap(
resp -> setReadOnlyBlock(client, indicesToUpgrade, readOnlyListener),
listener::onFailure
);
// <2> Move write index alias to new write indices
ActionListener<AcknowledgedResponse> createWriteIndicesAndSetReadAliasListener = ActionListener.wrap(
resp -> adjustAliases(client,
indexNameAndAliasProvider.oldIndicesWithWriteAliases(),
indexNameAndAliasProvider.newWriteIndicesWithWriteAliases(),
writeAliasesMovedListener),
listener::onFailure
);
// <1> Create the new write indices and set the read aliases to include them
createNewWriteIndicesIfNecessary(client, metaData, indexNameAndAliasProvider.newWriteIndices(),
ActionListener.wrap(
indicesCreated -> adjustAliases(client,
Collections.emptyMap(),
indexNameAndAliasProvider.newWriteIndicesWithReadAliases(),
createWriteIndicesAndSetReadAliasListener),
listener::onFailure
));
} catch (Exception e) {
listener.onFailure(e);
}
}
private void createNewWriteIndicesIfNecessary(Client client,
MetaData metaData,
Collection<String> newWriteIndices,
ActionListener<Boolean> createIndexListener) {
TypedChainTaskExecutor<CreateIndexResponse> chainTaskExecutor =
new TypedChainTaskExecutor<>(
client.threadPool().executor(executor),
(createIndexResponse -> true), //We always want to complete all our tasks
(exception ->
// Short circuit execution IF the exception is NOT a ResourceAlreadyExistsException
// This should be rare, as it requires the index to be created between our previous check and this exception
exception instanceof ResourceAlreadyExistsException == false
));
newWriteIndices.forEach((index) -> {
// If the index already exists, don't try and created it
// We have already verified that IF this index exists, that it does not require upgrading
// So, if it was created between that check and this one, we can assume it is the correct version as it was JUST created
if (metaData.hasIndex(index) == false) {
CreateIndexRequest request = new CreateIndexRequest(index);
chainTaskExecutor.add(listener ->
executeAsyncWithOrigin(client.threadPool().getThreadContext(),
ML_ORIGIN,
request,
listener,
client.admin().indices()::create));
}
});
chainTaskExecutor.execute(ActionListener.wrap(
createIndexResponses -> createIndexListener.onResponse(true),
createIndexListener::onFailure
));
}
/**
* Makes the indices readonly if it's not set as a readonly yet
*/
private void setReadOnlyBlock(Client client, List<String> indices, ActionListener<AcknowledgedResponse> listener) {
Settings settings = Settings.builder().put(IndexMetaData.INDEX_READ_ONLY_SETTING.getKey(), true).build();
UpdateSettingsRequest request = new UpdateSettingsRequest(indices.toArray(new String[0]));
request.settings(settings);
executeAsyncWithOrigin(client.threadPool().getThreadContext(),
ML_ORIGIN,
request,
listener,
client.admin().indices()::updateSettings);
}
private void removeReadOnlyBlock(Client client, List<String> indices,
ActionListener<AcknowledgedResponse> listener) {
Settings settings = Settings.builder().put(IndexMetaData.INDEX_READ_ONLY_SETTING.getKey(), false).build();
UpdateSettingsRequest request = new UpdateSettingsRequest(indices.toArray(new String[0]));
request.settings(settings);
executeAsyncWithOrigin(client.threadPool().getThreadContext(),
ML_ORIGIN,
request,
listener,
client.admin().indices()::updateSettings);
}
private void reindexOldReadIndicesToNewIndices(Client client,
Map<String, String> reindexIndices,
MlUpgradeAction.Request request,
ActionListener<Boolean> listener) {
TypedChainTaskExecutor<BulkByScrollResponse> chainTaskExecutor =
new TypedChainTaskExecutor<>(
client.threadPool().executor(executor),
(createIndexResponse) -> { // If there are errors in the reindex, we should stop
Tuple<RestStatus, Throwable> status = getStatusAndCause(createIndexResponse);
return status.v1().equals(RestStatus.OK);
},
(exception -> true)); // Short circuit and call onFailure for any exception
List<String> newIndices = new ArrayList<>(reindexIndices.size());
reindexIndices.forEach((oldIndex, newIndex) -> {
ReindexRequest reindexRequest = new ReindexRequest();
reindexRequest.setSourceBatchSize(request.getReindexBatchSize());
reindexRequest.setSourceIndices(oldIndex);
reindexRequest.setDestIndex(newIndex);
reindexRequest.setSourceDocTypes(ElasticsearchMappings.DOC_TYPE);
reindexRequest.setDestDocType(ElasticsearchMappings.DOC_TYPE);
// Don't worry if these indices already exist, we validated settings.index.created.version earlier
reindexRequest.setAbortOnVersionConflict(false);
// If the document exists already in the new index, don't want to update or overwrite as we are pulling from "old data"
reindexRequest.setDestOpType(DocWriteRequest.OpType.CREATE.getLowercase());
newIndices.add(newIndex);
chainTaskExecutor.add(chainedListener ->
executeAsyncWithOrigin(client,
ML_ORIGIN,
ReindexAction.INSTANCE,
reindexRequest,
chainedListener));
});
chainTaskExecutor.execute(ActionListener.wrap(
bulkScrollingResponses -> {
BulkByScrollResponse response = bulkScrollingResponses.get(bulkScrollingResponses.size() - 1);
Tuple<RestStatus, Throwable> status = getStatusAndCause(response);
if (status.v1().equals(RestStatus.OK)) {
listener.onResponse(true);
} else {
logger.error("Failed to reindex old results indices.", status.v2());
listener.onFailure(new ElasticsearchException("Failed to reindex old results indices.",status.v2()));
}
},
failure -> {
List<String> createdIndices = newIndices.subList(0, chainTaskExecutor.getCollectedResponses().size());
logger.error(
"Failed to reindex all old read indices. Successfully reindexed: [" +
Strings.collectionToCommaDelimitedString(createdIndices) + "]",
failure);
listener.onFailure(failure);
}
));
}
private void deleteOldIndices(Client client,
List<String> oldIndices,
ActionListener<AcknowledgedResponse> deleteIndicesListener) {
DeleteIndexRequest request = new DeleteIndexRequest(oldIndices.toArray(new String[0]));
request.indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);
executeAsyncWithOrigin(client.threadPool().getThreadContext(),
ML_ORIGIN,
request,
deleteIndicesListener,
client.admin().indices()::delete);
}
private void adjustAliases(Client client,
Map<String, List<Alias>> oldAliases,
Map<String, List<Alias>> newAliases,
ActionListener<AcknowledgedResponse> indicesAliasListener) {
IndicesAliasesRequest indicesAliasesRequest = new IndicesAliasesRequest();
oldAliases.forEach((oldIndex, aliases) ->
{
if (aliases.isEmpty() == false) { //if the aliases are empty, that means there are none to remove
indicesAliasesRequest.addAliasAction(IndicesAliasesRequest
.AliasActions
.remove()
.index(oldIndex)
.aliases(aliases.stream().map(Alias::name).toArray(String[]::new)));
}
}
);
newAliases.forEach((newIndex, aliases) ->
aliases.forEach(alias -> {
IndicesAliasesRequest.AliasActions action = IndicesAliasesRequest.AliasActions.add().index(newIndex);
if (alias.filter() != null) {
action.filter(alias.filter());
}
action.alias(alias.name());
indicesAliasesRequest.addAliasAction(action);
})
);
executeAsyncWithOrigin(client.threadPool().getThreadContext(),
ML_ORIGIN,
indicesAliasesRequest,
indicesAliasListener,
client.admin().indices()::aliases);
}
private static class IndexNameAndAliasProvider {
private final List<String> oldIndices;
private final Map<String, List<Alias>> writeAliases = new HashMap<>();
private final Map<String, List<Alias>> readAliases = new HashMap<>();
private IndexNameAndAliasProvider(List<String> oldIndices, MetaData metaData) {
this.oldIndices = oldIndices;
oldIndices.forEach(index -> {
IndexMetaData indexMetaData = metaData.index(index);
List<Alias> writes = new ArrayList<>();
List<Alias> reads = new ArrayList<>();
indexMetaData.getAliases().forEach(aliasCursor -> {
Alias alias = new Alias(aliasCursor.value.alias());
if (aliasCursor.value.filteringRequired()) {
alias.filter(aliasCursor.value.getFilter().string()); //Set the read alias jobId filter
}
if (alias.name().contains(".write-")) {
writes.add(alias);
} else {
reads.add(alias);
}
});
writeAliases.put(index, writes);
readAliases.put(index, reads);
});
}
private Exception validate(MetaData metaData, Predicate<IndexMetaData> shouldUpgrade) {
for (String index : oldIndices) {
String newWriteName = newWriteName(index);
// If the "new" indices exist, either they were created from a previous run of the upgrade process or the end user
if (validNewIndex(metaData, newWriteName, shouldUpgrade) == false) {
return new IllegalStateException("Index [" + newWriteName + "] already exists and is not the current version.");
}
String newReadName = newReadName(index);
if (validNewIndex(metaData, newReadName, shouldUpgrade) == false) {
return new IllegalStateException("Index [" + newReadName + "] already exists and is not the current version.");
}
}
return null;
}
private String newReadName(String oldIndexName) {
return oldIndexName + "-" + INDEX_VERSION + "r";
}
private String newWriteName(String oldIndexName) {
return oldIndexName + "-" + INDEX_VERSION;
}
private List<String> newWriteIndices() {
return oldIndices.stream().map(this::newWriteName).collect(Collectors.toList());
}
private List<Alias> readAliases(String oldIndex) {
return readAliases.get(oldIndex);
}
private List<Alias> writeAliases(String oldIndex) {
return writeAliases.get(oldIndex);
}
private Map<String, List<Alias>> newWriteIndicesWithReadAliases() {
return oldIndices.stream().collect(Collectors.toMap(this::newWriteName, this::readAliases));
}
private Map<String, List<Alias>> oldIndicesWithWriteAliases() {
return writeAliases;
}
private Map<String, List<Alias>> newWriteIndicesWithWriteAliases() {
return oldIndices.stream().collect(Collectors.toMap(this::newWriteName, this::writeAliases));
}
private Map<String, List<Alias>> oldIndicesWithReadAliases() {
return readAliases;
}
private Map<String, List<Alias>> newReadIndicesWithReadAliases() {
return oldIndices.stream().collect(Collectors.toMap(this::newReadName, this::readAliases));
}
private Map<String, String> needsReindex() {
return oldIndices.stream().collect(Collectors.toMap(Function.identity(), this::newReadName));
}
}
}

View File

@ -1,79 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.TransportMasterNodeReadAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.ParentTaskAssigningClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ml.action.MlUpgradeAction;
import org.elasticsearch.xpack.ml.ResultsIndexUpgradeService;
import static org.elasticsearch.xpack.ml.ResultsIndexUpgradeService.wasIndexCreatedInCurrentMajorVersion;
public class TransportMlUpgradeAction
extends TransportMasterNodeReadAction<MlUpgradeAction.Request, AcknowledgedResponse> {
private final Client client;
private final ResultsIndexUpgradeService resultsIndexUpgradeService;
@Inject
public TransportMlUpgradeAction(TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters, Client client,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(MlUpgradeAction.NAME, transportService, clusterService, threadPool,
actionFilters, MlUpgradeAction.Request::new, indexNameExpressionResolver);
this.client = client;
this.resultsIndexUpgradeService = new ResultsIndexUpgradeService(indexNameExpressionResolver,
executor(),
indexMetadata -> wasIndexCreatedInCurrentMajorVersion(indexMetadata) == false);
}
@Override
protected void masterOperation(Task task, MlUpgradeAction.Request request, ClusterState state,
ActionListener<AcknowledgedResponse> listener) {
TaskId taskId = new TaskId(clusterService.localNode().getId(), task.getId());
ParentTaskAssigningClient parentAwareClient = new ParentTaskAssigningClient(client, taskId);
try {
resultsIndexUpgradeService.upgrade(parentAwareClient, request, state, listener);
} catch (Exception e) {
listener.onFailure(e);
}
}
@Override
protected final void masterOperation(MlUpgradeAction.Request request, ClusterState state,
ActionListener<AcknowledgedResponse> listener) {
throw new UnsupportedOperationException("the task parameter is required");
}
@Override
protected String executor() {
return ThreadPool.Names.SAME;
}
@Override
protected AcknowledgedResponse newResponse() {
return new AcknowledgedResponse();
}
@Override
protected ClusterBlockException checkBlock(MlUpgradeAction.Request request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
}
}

View File

@ -1,76 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.rest.results;
import org.apache.logging.log4j.LogManager;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.tasks.LoggingTaskListener;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.xpack.core.ml.action.MlUpgradeAction;
import org.elasticsearch.xpack.ml.MachineLearning;
import java.io.IOException;
import static org.elasticsearch.rest.RestRequest.Method.POST;
public class RestUpgradeMlAction extends BaseRestHandler {
private static final DeprecationLogger deprecationLogger =
new DeprecationLogger(LogManager.getLogger(RestUpgradeMlAction.class));
public RestUpgradeMlAction(Settings settings, RestController controller) {
super(settings);
controller.registerWithDeprecatedHandler(
POST,
MachineLearning.BASE_PATH + "_upgrade",
this,
POST,
MachineLearning.PRE_V7_BASE_PATH + "_upgrade",
deprecationLogger);
}
@Override
public String getName() {
return "xpack_ml_upgrade_action";
}
@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
MlUpgradeAction.Request parsedRequest = new MlUpgradeAction.Request();
if (restRequest.hasContent()) {
XContentParser parser = restRequest.contentParser();
parsedRequest = MlUpgradeAction.Request.PARSER.apply(parser, null);
}
final MlUpgradeAction.Request upgradeRequest = parsedRequest;
if (restRequest.paramAsBoolean("wait_for_completion", false)) {
return channel -> client.execute(MlUpgradeAction.INSTANCE, upgradeRequest, new RestToXContentListener<>(channel));
} else {
upgradeRequest.setShouldStoreResult(true);
Task task = client.executeLocally(MlUpgradeAction.INSTANCE, upgradeRequest, LoggingTaskListener.instance());
// Send task description id instead of waiting for the message
return channel -> {
try (XContentBuilder builder = channel.newBuilder()) {
builder.startObject();
builder.field("task", client.getLocalNodeId() + ":" + task.getId());
builder.endObject();
channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder));
}
};
}
}
}

View File

@ -1,21 +0,0 @@
{
"ml.upgrade": {
"documentation": "TODO",
"methods": [ "POST" ],
"url": {
"path": "/_ml/_upgrade",
"paths": [ "/_ml/_upgrade" ],
"params": {
"wait_for_completion": {
"type": "boolean",
"description": "Should this request wait until the operation has completed before returning",
"default": false
}
}
},
"body": {
"description" : "Upgrade options",
"required" : false
}
}
}

View File

@ -1,70 +0,0 @@
setup:
- skip:
features: headers
- do:
headers:
Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser
ml.put_job:
job_id: jobs-upgrade-results
body: >
{
"analysis_config" : {
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
},
"data_description" : {
"format":"xcontent",
"time_field":"time"
}
}
- do:
headers:
Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser
Content-Type: application/json
index:
index: .ml-anomalies-jobs-upgrade-results
type: doc
id: "jobs-upgrade-results_1464739200000_1"
body:
{
"job_id": "jobs-upgrade-results",
"result_type": "bucket",
"timestamp": "2016-06-01T00:00:00Z",
"anomaly_score": 90.0,
"bucket_span":1
}
- do:
headers:
Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser
indices.refresh:
index: .ml-anomalies-jobs-upgrade-results
---
"Upgrade results when there is nothing to upgrade":
- do:
ml.upgrade:
wait_for_completion: true
- match: { acknowledged: true }
- do:
indices.exists:
index: .ml-anomalies-shared
- is_true: ''
---
"Upgrade results when there is nothing to upgrade not waiting for results":
- do:
ml.upgrade:
wait_for_completion: false
- match: {task: '/.+:\d+/'}
- set: {task: task}
- do:
tasks.get:
wait_for_completion: true
task_id: $task
- match: {completed: true}
- match: {response.acknowledged: true}

View File

@ -1,11 +0,0 @@
---
"Verify jobs exist":
- do:
ml.get_jobs:
job_id: old-cluster-job-to-upgrade
- match: { count: 1 }
- do:
ml.get_jobs:
job_id: old-cluster-job-to-upgrade-custom
- match: { count: 1 }

View File

@ -1,120 +0,0 @@
---
"Put job on the old cluster and post some data":
- do:
ml.put_job:
job_id: old-cluster-job-to-upgrade
body: >
{
"description":"Cluster upgrade",
"analysis_config" : {
"bucket_span": "60s",
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
},
"analysis_limits" : {
"model_memory_limit": "50mb"
},
"data_description" : {
"format":"xcontent",
"time_field":"time",
"time_format":"epoch"
}
}
- match: { job_id: old-cluster-job-to-upgrade }
- do:
ml.open_job:
job_id: old-cluster-job-to-upgrade
- do:
ml.post_data:
job_id: old-cluster-job-to-upgrade
body:
- airline: AAL
responsetime: 132.2046
sourcetype: post-data-job
time: 1403481600
- airline: JZA
responsetime: 990.4628
sourcetype: post-data-job
time: 1403481700
- match: { processed_record_count: 2 }
- do:
ml.close_job:
job_id: old-cluster-job-to-upgrade
- do:
ml.get_buckets:
job_id: old-cluster-job-to-upgrade
- match: { count: 1 }
# Wait for indices to be fully allocated before
# killing the node
- do:
cluster.health:
index: [".ml-state", ".ml-anomalies-shared"]
wait_for_status: green
---
"Put job on the old cluster with a custom index":
- do:
ml.put_job:
job_id: old-cluster-job-to-upgrade-custom
body: >
{
"description":"Cluster upgrade",
"analysis_config" : {
"bucket_span": "60s",
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
},
"analysis_limits" : {
"model_memory_limit": "50mb"
},
"data_description" : {
"format":"xcontent",
"time_field":"time",
"time_format":"epoch"
},
"results_index_name": "old-cluster-job-to-upgrade-custom"
}
- match: { job_id: old-cluster-job-to-upgrade-custom }
- do:
ml.open_job:
job_id: old-cluster-job-to-upgrade-custom
- do:
ml.post_data:
job_id: old-cluster-job-to-upgrade-custom
body:
- airline: AAL
responsetime: 132.2046
sourcetype: post-data-job
time: 1403481600
- airline: JZA
responsetime: 990.4628
sourcetype: post-data-job
time: 1403481700
- airline: JZA
responsetime: 423.0000
sourcetype: post-data-job
time: 1403481800
- match: { processed_record_count: 3 }
- do:
ml.close_job:
job_id: old-cluster-job-to-upgrade-custom
- do:
ml.get_buckets:
job_id: old-cluster-job-to-upgrade-custom
- match: { count: 3 }
# Wait for indices to be fully allocated before
# killing the node
- do:
cluster.health:
index: [".ml-state", ".ml-anomalies-old-cluster-job-to-upgrade-custom"]
wait_for_status: green

View File

@ -1,158 +0,0 @@
---
"Migrate results data to latest index binary version":
# Verify that all the results are there and the typical indices exist
- do:
ml.get_buckets:
job_id: old-cluster-job-to-upgrade
- match: { count: 1 }
- do:
ml.get_buckets:
job_id: old-cluster-job-to-upgrade-custom
- match: { count: 3 }
- do:
indices.exists:
index: .ml-anomalies-shared
- is_true: ''
- do:
indices.get_settings:
index: .ml-anomalies-shared
name: index.version.created
- match: { \.ml-anomalies-shared.settings.index.version.created: '/6\d+/' }
- do:
indices.exists:
index: .ml-anomalies-custom-old-cluster-job-to-upgrade-custom
- is_true: ''
# Do the upgrade
- do:
ml.upgrade:
wait_for_completion: true
- match: { acknowledged: true }
# Verify that old indices are gone
- do:
indices.exists:
index: .ml-anomalies-shared
- is_false: ''
- do:
indices.exists:
index: .ml-anomalies-custom-old-cluster-job-to-upgrade-custom
- is_false: ''
# Verify that results can still be retrieved
- do:
indices.refresh: {}
- do:
ml.get_buckets:
job_id: old-cluster-job-to-upgrade
- match: { count: 1 }
- do:
ml.get_buckets:
job_id: old-cluster-job-to-upgrade-custom
- match: { count: 3 }
# Verify the created version is correct
- do:
indices.get_settings:
index: .ml-anomalies-old-cluster-job-to-upgrade
name: index.version.created
- match: { \.ml-anomalies-shared-7.settings.index.version.created: '/7\d+/' }
- match: { \.ml-anomalies-shared-7r.settings.index.version.created: '/7\d+/' }
- do:
indices.get_settings:
index: .ml-anomalies-old-cluster-job-to-upgrade-custom
name: index.version.created
- match: { \.ml-anomalies-custom-old-cluster-job-to-upgrade-custom-7.settings.index.version.created: '/7\d+/' }
- match: { \.ml-anomalies-custom-old-cluster-job-to-upgrade-custom-7r.settings.index.version.created: '/7\d+/' }
# Create a new job to verify that the .ml-anomalies-shared index gets created again without issues
- do:
ml.put_job:
job_id: upgraded-cluster-job-should-not-upgrade
body: >
{
"description":"Cluster upgrade",
"analysis_config" : {
"bucket_span": "60s",
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
},
"analysis_limits" : {
"model_memory_limit": "50mb"
},
"data_description" : {
"format":"xcontent",
"time_field":"time",
"time_format":"epoch"
}
}
- match: { job_id: upgraded-cluster-job-should-not-upgrade }
- do:
ml.open_job:
job_id: upgraded-cluster-job-should-not-upgrade
- do:
ml.post_data:
job_id: upgraded-cluster-job-should-not-upgrade
body:
- airline: AAL
responsetime: 132.2046
sourcetype: post-data-job
time: 1403481600
- airline: JZA
responsetime: 990.4628
sourcetype: post-data-job
time: 1403481700
- match: { processed_record_count: 2 }
- do:
ml.close_job:
job_id: upgraded-cluster-job-should-not-upgrade
- do:
ml.get_buckets:
job_id: upgraded-cluster-job-should-not-upgrade
- match: { count: 1 }
- do:
indices.exists:
index: .ml-anomalies-shared
- is_true: ''
- do:
indices.get_settings:
index: .ml-anomalies-shared
name: index.version.created
- match: { \.ml-anomalies-shared.settings.index.version.created: '/7\d+/' }
# Do the upgrade Again as nothing needs upgraded now
- do:
ml.upgrade:
wait_for_completion: true
- match: { acknowledged: true }
- do:
indices.exists:
index: .ml-anomalies-shared
- is_true: ''