ML: add migrate anomalies assistant (#36643)

* ML: add migrate anomalies assistant

* adjusting failure handling for reindex

* Fixing request and tests

* Adding tests to blacklist

* adjusting test

* test fix: posting data directly to the job instead of relying on datafeed

* adjusting API usage

* adding Todos and adjusting endpoint

* Adding types to reindexRequest

* removing unreliable "live" data test

* adding index refresh to test

* adding index refresh to test

* adding index refresh to yaml test

* fixing bad exists call

* removing todo

* Addressing remove comments

* Adjusting rest endpoint name

* making service have its own logger

* adjusting validity check for newindex names

* fixing typos

* fixing renaming
This commit is contained in:
Benjamin Trent 2019-01-09 14:25:35 -06:00 committed by GitHub
parent c71060fa01
commit df3b58cb04
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 1826 additions and 111 deletions

View File

@ -105,6 +105,7 @@ import org.elasticsearch.xpack.core.ml.action.PutCalendarAction;
import org.elasticsearch.xpack.core.ml.action.PutDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.PutFilterAction;
import org.elasticsearch.xpack.core.ml.action.PutJobAction;
import org.elasticsearch.xpack.core.ml.action.MlUpgradeAction;
import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction;
import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction;
@ -289,6 +290,7 @@ public class XPackClientPlugin extends Plugin implements ActionPlugin, NetworkPl
PostCalendarEventsAction.INSTANCE,
PersistJobAction.INSTANCE,
FindFileStructureAction.INSTANCE,
MlUpgradeAction.INSTANCE,
// security
ClearRealmCacheAction.INSTANCE,
ClearRolesCacheAction.INSTANCE,

View File

@ -0,0 +1,160 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.ml.action;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.MasterNodeReadOperationRequestBuilder;
import org.elasticsearch.action.support.master.MasterNodeReadRequest;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFields;
import java.io.IOException;
import java.util.Map;
import java.util.Objects;
public class MlUpgradeAction extends Action<AcknowledgedResponse> {
public static final MlUpgradeAction INSTANCE = new MlUpgradeAction();
public static final String NAME = "cluster:admin/xpack/ml/upgrade";
private MlUpgradeAction() {
super(NAME);
}
@Override
public AcknowledgedResponse newResponse() {
return new AcknowledgedResponse();
}
public static class Request extends MasterNodeReadRequest<Request> implements ToXContentObject {
private static final ParseField REINDEX_BATCH_SIZE = new ParseField("reindex_batch_size");
public static ObjectParser<Request, Void> PARSER = new ObjectParser<>("ml_upgrade", true, Request::new);
static {
PARSER.declareInt(Request::setReindexBatchSize, REINDEX_BATCH_SIZE);
}
static final String INDEX = AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + "*";
private int reindexBatchSize = 1000;
/**
* Should this task store its result?
*/
private boolean shouldStoreResult;
// for serialization
public Request() {
}
public Request(StreamInput in) throws IOException {
super(in);
reindexBatchSize = in.readInt();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeInt(reindexBatchSize);
}
public String[] indices() {
return new String[]{INDEX};
}
public IndicesOptions indicesOptions() {
return IndicesOptions.strictExpandOpenAndForbidClosed();
}
/**
* Should this task store its result after it has finished?
*/
public Request setShouldStoreResult(boolean shouldStoreResult) {
this.shouldStoreResult = shouldStoreResult;
return this;
}
@Override
public boolean getShouldStoreResult() {
return shouldStoreResult;
}
public Request setReindexBatchSize(int reindexBatchSize) {
this.reindexBatchSize = reindexBatchSize;
return this;
}
public int getReindexBatchSize() {
return reindexBatchSize;
}
@Override
public ActionRequestValidationException validate() {
if (reindexBatchSize <= 0) {
ActionRequestValidationException validationException = new ActionRequestValidationException();
validationException.addValidationError("["+ REINDEX_BATCH_SIZE.getPreferredName()+"] must be greater than 0.");
return validationException;
}
return null;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Request request = (Request) o;
return Objects.equals(reindexBatchSize, request.reindexBatchSize);
}
@Override
public int hashCode() {
return Objects.hash(reindexBatchSize);
}
@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new CancellableTask(id, type, action, "ml-upgrade", parentTaskId, headers) {
@Override
public boolean shouldCancelChildrenOnCancellation() {
return true;
}
};
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(REINDEX_BATCH_SIZE.getPreferredName(), reindexBatchSize);
builder.endObject();
return builder;
}
}
public static class RequestBuilder extends MasterNodeReadOperationRequestBuilder<Request, AcknowledgedResponse, RequestBuilder> {
public RequestBuilder(ElasticsearchClient client) {
super(client, INSTANCE, new Request());
}
}
}

View File

@ -0,0 +1,28 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.ml.action;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
public class MlUpgradeRequestTests extends AbstractWireSerializingTestCase<MlUpgradeAction.Request> {
@Override
protected MlUpgradeAction.Request createTestInstance() {
MlUpgradeAction.Request request = new MlUpgradeAction.Request();
if (randomBoolean()) {
request.setReindexBatchSize(randomIntBetween(1, 10_000));
}
return request;
}
@Override
protected Writeable.Reader<MlUpgradeAction.Request> instanceReader() {
return MlUpgradeAction.Request::new;
}
}

View File

@ -93,7 +93,9 @@ integTestRunner {
'ml/validate/Test job config that is invalid only because of the job ID',
'ml/validate_detector/Test invalid detector',
'ml/delete_forecast/Test delete on _all forecasts not allow no forecasts',
'ml/delete_forecast/Test delete forecast on missing forecast'
'ml/delete_forecast/Test delete forecast on missing forecast',
'ml/ml_upgrade/Upgrade results when there is nothing to upgrade',
'ml/ml_upgrade/Upgrade results when there is nothing to upgrade not waiting for results'
].join(',')
}

View File

@ -26,6 +26,7 @@ import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.ReindexPlugin;
import org.elasticsearch.persistent.PersistentTaskParams;
import org.elasticsearch.persistent.PersistentTaskState;
import org.elasticsearch.plugins.Plugin;
@ -120,7 +121,7 @@ abstract class MlNativeAutodetectIntegTestCase extends ESIntegTestCase {
@Override
protected Collection<Class<? extends Plugin>> transportClientPlugins() {
return Arrays.asList(XPackClientPlugin.class, Netty4Plugin.class);
return Arrays.asList(XPackClientPlugin.class, Netty4Plugin.class, ReindexPlugin.class);
}
@Override

View File

@ -0,0 +1,378 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.integration;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.ReindexAction;
import org.elasticsearch.index.reindex.ReindexRequest;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ml.action.MlUpgradeAction;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFields;
import org.elasticsearch.xpack.ml.ResultsIndexUpgradeService;
import org.junit.After;
import org.junit.Assert;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createDatafeedBuilder;
import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createScheduledJob;
import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.indexDocs;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.core.Is.is;
public class MlUpgradeIT extends MlNativeAutodetectIntegTestCase {
@After
public void cleanup() throws Exception {
cleanUp();
}
public void testMigrationWhenItIsNotNecessary() throws Exception {
String jobId1 = "no-migration-test1";
String jobId2 = "no-migration-test2";
String jobId3 = "no-migration-test3";
String dataIndex = createDataIndex().v2();
List<Job> jobs = createJobsWithData(jobId1, jobId2, jobId3, dataIndex);
Job job1 = jobs.get(0);
Job job2 = jobs.get(1);
Job job3 = jobs.get(2);
String job1Index = job1.getResultsIndexName();
String job2Index = job2.getResultsIndexName();
String job3Index = job3.getResultsIndexName();
assertThat(indexExists(job1Index), is(true));
assertThat(indexExists(job2Index), is(true));
assertThat(indexExists(job3Index), is(true));
long job1Total = getTotalDocCount(job1Index);
long job2Total = getTotalDocCount(job2Index);
long job3Total = getTotalDocCount(job3Index);
AcknowledgedResponse resp = ESIntegTestCase.client().execute(MlUpgradeAction.INSTANCE,
new MlUpgradeAction.Request()).actionGet();
assertThat(resp.isAcknowledged(), is(true));
// Migration should have done nothing
assertThat(indexExists(job1Index), is(true));
assertThat(indexExists(job2Index), is(true));
assertThat(indexExists(job3Index), is(true));
assertThat(getTotalDocCount(job1Index), equalTo(job1Total));
assertThat(getTotalDocCount(job2Index), equalTo(job2Total));
assertThat(getTotalDocCount(job3Index), equalTo(job3Total));
ClusterState state = admin().cluster().state(new ClusterStateRequest()).actionGet().getState();
IndexNameExpressionResolver indexNameExpressionResolver = new IndexNameExpressionResolver();
String[] indices = indexNameExpressionResolver.concreteIndexNames(state,
IndicesOptions.strictExpandOpenAndForbidClosed(),
AnomalyDetectorsIndex.jobResultsIndexPrefix() + "*");
// Our backing index size should be two as we have a shared and custom index
assertThat(indices.length, equalTo(2));
}
public void testMigration() throws Exception {
String jobId1 = "migration-test1";
String jobId2 = "migration-test2";
String jobId3 = "migration-test3";
String dataIndex = createDataIndex().v2();
List<Job> jobs = createJobsWithData(jobId1, jobId2, jobId3, dataIndex);
Job job1 = jobs.get(0);
Job job2 = jobs.get(1);
Job job3 = jobs.get(2);
String job1Index = job1.getResultsIndexName();
String job2Index = job2.getResultsIndexName();
String job3Index = job3.getResultsIndexName();
assertThat(indexExists(job1Index), is(true));
assertThat(indexExists(job2Index), is(true));
assertThat(indexExists(job3Index), is(true));
long job1Total = getJobResultsCount(job1.getId());
long job2Total = getJobResultsCount(job2.getId());
long job3Total = getJobResultsCount(job3.getId());
IndexNameExpressionResolver indexNameExpressionResolver = new IndexNameExpressionResolver();
ResultsIndexUpgradeService resultsIndexUpgradeService = new ResultsIndexUpgradeService(indexNameExpressionResolver,
ThreadPool.Names.SAME,
indexMetaData -> true);
PlainActionFuture<AcknowledgedResponse> future = PlainActionFuture.newFuture();
resultsIndexUpgradeService.upgrade(ESIntegTestCase.client(),
new MlUpgradeAction.Request(),
ESIntegTestCase.client().admin().cluster().prepareState().get().getState(),
future);
AcknowledgedResponse response = future.get();
assertThat(response.isAcknowledged(), is(true));
assertThat(indexExists(job1Index), is(false));
assertThat(indexExists(job2Index), is(false));
assertThat(indexExists(job3Index), is(false));
ClusterState state = admin().cluster().state(new ClusterStateRequest()).actionGet().getState();
String[] indices = indexNameExpressionResolver.concreteIndexNames(state,
IndicesOptions.strictExpandOpenAndForbidClosed(),
AnomalyDetectorsIndex.jobResultsIndexPrefix() + "*");
// Our backing index size should be four as we have a shared and custom index and upgrading doubles the number of indices
Assert.assertThat(indices.length, equalTo(4));
refresh(indices);
assertThat(getJobResultsCount(job1.getId()), equalTo(job1Total));
assertThat(getJobResultsCount(job2.getId()), equalTo(job2Total));
assertThat(getJobResultsCount(job3.getId()), equalTo(job3Total));
// WE should still be able to write, and the aliases should allow to read from the appropriate indices
postDataToJob(jobId1);
postDataToJob(jobId2);
postDataToJob(jobId3);
// We should also be able to create new jobs and old jobs should be unaffected.
String jobId4 = "migration-test4";
Job job4 = createAndOpenJobAndStartDataFeedWithData(jobId4, dataIndex, false);
waitUntilJobIsClosed(jobId4);
indices = indexNameExpressionResolver.concreteIndexNames(state,
IndicesOptions.strictExpandOpenAndForbidClosed(),
AnomalyDetectorsIndex.jobResultsIndexPrefix() + "*");
refresh(indices);
long newJob1Total = getJobResultsCount(job1.getId());
assertThat(newJob1Total, greaterThan(job1Total));
long newJob2Total = getJobResultsCount(job2.getId());
assertThat(newJob2Total, greaterThan(job2Total));
long newJob3Total = getJobResultsCount(job3.getId());
assertThat(newJob3Total, greaterThan(job3Total));
assertThat(getJobResultsCount(jobId4), greaterThan(0L));
assertThat(getJobResultsCount(jobId1), equalTo(newJob1Total));
assertThat(getJobResultsCount(jobId2), equalTo(newJob2Total));
assertThat(getJobResultsCount(jobId3), equalTo(newJob3Total));
}
//I think this test name could be a little bit longer....
public void testMigrationWithManuallyCreatedIndexThatNeedsMigrating() throws Exception {
String jobId1 = "migration-failure-test1";
String jobId2 = "migration-failure-test2";
String jobId3 = "migration-failure-test3";
String dataIndex = createDataIndex().v2();
List<Job> jobs = createJobsWithData(jobId1, jobId2, jobId3, dataIndex);
Job job1 = jobs.get(0);
Job job2 = jobs.get(1);
Job job3 = jobs.get(2);
String job1Index = job1.getResultsIndexName();
String job2Index = job2.getResultsIndexName();
String job3Index = job3.getResultsIndexName();
// This index name should match one of the automatically created migration indices
String manuallyCreatedIndex = job1Index + "-" + Version.CURRENT.major;
client().admin().indices().prepareCreate(manuallyCreatedIndex).execute().actionGet();
IndexNameExpressionResolver indexNameExpressionResolver = new IndexNameExpressionResolver();
ResultsIndexUpgradeService resultsIndexUpgradeService = new ResultsIndexUpgradeService(indexNameExpressionResolver,
ThreadPool.Names.SAME,
indexMetaData -> true); //indicates that this manually created index needs migrated
resultsIndexUpgradeService.upgrade(ESIntegTestCase.client(),
new MlUpgradeAction.Request(),
ESIntegTestCase.client().admin().cluster().prepareState().get().getState(),
ActionListener.wrap(
resp -> fail(),
exception -> {
assertThat(exception, instanceOf(IllegalStateException.class));
assertThat(exception.getMessage(),
equalTo("Index [" + manuallyCreatedIndex + "] already exists and is not the current version."));
}
));
}
public void testMigrationWithExistingIndexWithData() throws Exception {
String jobId1 = "partial-migration-test1";
String jobId2 = "partial-migration-test2";
String jobId3 = "partial-migration-test3";
String dataIndex = createDataIndex().v2();
List<Job> jobs = createJobsWithData(jobId1, jobId2, jobId3, dataIndex);
Job job1 = jobs.get(0);
Job job2 = jobs.get(1);
Job job3 = jobs.get(2);
String job1Index = job1.getResultsIndexName();
String job2Index = job2.getResultsIndexName();
String job3Index = job3.getResultsIndexName();
assertThat(indexExists(job1Index), is(true));
assertThat(indexExists(job2Index), is(true));
assertThat(indexExists(job3Index), is(true));
long job1Total = getJobResultsCount(job1.getId());
long job2Total = getJobResultsCount(job2.getId());
long job3Total = getJobResultsCount(job3.getId());
//lets manually create a READ index with reindexed data already
// Should still get aliased appropriately without any additional/duplicate data.
String alreadyMigratedIndex = job1Index + "-" + Version.CURRENT.major + "r";
ReindexRequest reindexRequest = new ReindexRequest();
reindexRequest.setSourceIndices(job1Index);
reindexRequest.setDestIndex(alreadyMigratedIndex);
client().execute(ReindexAction.INSTANCE, reindexRequest).actionGet();
//New write index as well, should still get aliased appropriately
String alreadyMigratedWriteIndex = job1Index + "-" + Version.CURRENT.major;
client().admin().indices().prepareCreate(alreadyMigratedWriteIndex).execute().actionGet();
IndexNameExpressionResolver indexNameExpressionResolver = new IndexNameExpressionResolver();
ResultsIndexUpgradeService resultsIndexUpgradeService = new ResultsIndexUpgradeService(indexNameExpressionResolver,
ThreadPool.Names.SAME,
//indicates that this manually created index is already migrated and should not be included in our migration steps
indexMetaData -> !(indexMetaData.getIndex().getName().equals(alreadyMigratedIndex) ||
indexMetaData.getIndex().getName().equals(alreadyMigratedWriteIndex)));
PlainActionFuture<AcknowledgedResponse> future = PlainActionFuture.newFuture();
resultsIndexUpgradeService.upgrade(ESIntegTestCase.client(),
new MlUpgradeAction.Request(),
ESIntegTestCase.client().admin().cluster().prepareState().get().getState(),
future);
AcknowledgedResponse response = future.get();
assertThat(response.isAcknowledged(), is(true));
assertThat(indexExists(job1Index), is(false));
assertThat(indexExists(job2Index), is(false));
assertThat(indexExists(job3Index), is(false));
ClusterState state = admin().cluster().state(new ClusterStateRequest()).actionGet().getState();
String[] indices = indexNameExpressionResolver.concreteIndexNames(state,
IndicesOptions.strictExpandOpenAndForbidClosed(),
AnomalyDetectorsIndex.jobResultsIndexPrefix() + "*");
// Our backing index size should be four as we have a shared and custom index and upgrading doubles the number of indices
Assert.assertThat(indices.length, equalTo(4));
refresh(indices);
assertThat(getJobResultsCount(job1.getId()), equalTo(job1Total));
assertThat(getJobResultsCount(job2.getId()), equalTo(job2Total));
assertThat(getJobResultsCount(job3.getId()), equalTo(job3Total));
// WE should still be able to write, and the aliases should allow to read from the appropriate indices
postDataToJob(jobId1);
postDataToJob(jobId2);
postDataToJob(jobId3);
refresh(indices);
long newJob1Total = getJobResultsCount(job1.getId());
assertThat(newJob1Total, greaterThan(job1Total));
long newJob2Total = getJobResultsCount(job2.getId());
assertThat(newJob2Total, greaterThan(job2Total));
long newJob3Total = getJobResultsCount(job3.getId());
assertThat(newJob3Total, greaterThan(job3Total));
}
private long getTotalDocCount(String indexName) {
SearchResponse searchResponse = ESIntegTestCase.client().prepareSearch(indexName)
.setSize(10_000)
.setTrackTotalHits(true)
.setQuery(QueryBuilders.matchAllQuery())
.execute().actionGet();
return searchResponse.getHits().getTotalHits().value;
}
private long getJobResultsCount(String jobId) {
String index = AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + jobId;
return getTotalDocCount(index);
}
private void postDataToJob(String jobId) throws Exception {
openJob(jobId);
ESTestCase.assertBusy(() -> Assert.assertEquals(getJobStats(jobId).get(0).getState(), JobState.OPENED));
startDatafeed(jobId + "-datafeed", 0L, System.currentTimeMillis());
waitUntilJobIsClosed(jobId);
}
private Job createAndOpenJobAndStartDataFeedWithData(String jobId, String dataIndex, boolean isCustom) throws Exception {
Job.Builder jobbuilder = createScheduledJob(jobId);
if (isCustom) {
jobbuilder.setResultsIndexName(jobId);
}
registerJob(jobbuilder);
Job job = putJob(jobbuilder).getResponse();
openJob(job.getId());
ESTestCase.assertBusy(() -> Assert.assertEquals(getJobStats(job.getId()).get(0).getState(), JobState.OPENED));
DatafeedConfig.Builder builder = createDatafeedBuilder(job.getId() + "-datafeed",
job.getId(),
Collections.singletonList(dataIndex));
builder.setQueryDelay(TimeValue.timeValueSeconds(5));
builder.setFrequency(TimeValue.timeValueSeconds(5));
DatafeedConfig datafeedConfig = builder.build();
registerDatafeed(datafeedConfig);
putDatafeed(datafeedConfig);
startDatafeed(datafeedConfig.getId(), 0L, System.currentTimeMillis());
waitUntilJobIsClosed(jobId);
return job;
}
private Tuple<Long, String> createDataIndex() {
ESIntegTestCase.client().admin().indices().prepareCreate("data-for-migration-1")
.addMapping("type", "time", "type=date")
.get();
long numDocs = ESTestCase.randomIntBetween(32, 512);
long now = System.currentTimeMillis();
long oneWeekAgo = now - 604800000;
long twoWeeksAgo = oneWeekAgo - 604800000;
indexDocs(logger, "data-for-migration-1", numDocs, twoWeeksAgo, oneWeekAgo);
return new Tuple<>(numDocs, "data-for-migration-1");
}
private List<Job> createJobsWithData(String sharedJobId1, String sharedJobId2, String customJobId, String dataIndex) throws Exception {
Job job1 = createAndOpenJobAndStartDataFeedWithData(sharedJobId1, dataIndex, false);
Job job2 = createAndOpenJobAndStartDataFeedWithData(sharedJobId2, dataIndex, false);
Job job3 = createAndOpenJobAndStartDataFeedWithData(customJobId, dataIndex, true);
return Arrays.asList(job1, job2, job3);
}
}

View File

@ -96,6 +96,7 @@ import org.elasticsearch.xpack.core.ml.action.PutCalendarAction;
import org.elasticsearch.xpack.core.ml.action.PutDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.PutFilterAction;
import org.elasticsearch.xpack.core.ml.action.PutJobAction;
import org.elasticsearch.xpack.core.ml.action.MlUpgradeAction;
import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction;
import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction;
@ -150,6 +151,7 @@ import org.elasticsearch.xpack.ml.action.TransportPutCalendarAction;
import org.elasticsearch.xpack.ml.action.TransportPutDatafeedAction;
import org.elasticsearch.xpack.ml.action.TransportPutFilterAction;
import org.elasticsearch.xpack.ml.action.TransportPutJobAction;
import org.elasticsearch.xpack.ml.action.TransportMlUpgradeAction;
import org.elasticsearch.xpack.ml.action.TransportRevertModelSnapshotAction;
import org.elasticsearch.xpack.ml.action.TransportStartDatafeedAction;
import org.elasticsearch.xpack.ml.action.TransportStopDatafeedAction;
@ -229,6 +231,7 @@ import org.elasticsearch.xpack.ml.rest.results.RestGetCategoriesAction;
import org.elasticsearch.xpack.ml.rest.results.RestGetInfluencersAction;
import org.elasticsearch.xpack.ml.rest.results.RestGetOverallBucketsAction;
import org.elasticsearch.xpack.ml.rest.results.RestGetRecordsAction;
import org.elasticsearch.xpack.ml.rest.results.RestUpgradeMlAction;
import org.elasticsearch.xpack.ml.rest.validate.RestValidateDetectorAction;
import org.elasticsearch.xpack.ml.rest.validate.RestValidateJobConfigAction;
@ -541,7 +544,8 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
new RestPutCalendarJobAction(settings, restController),
new RestGetCalendarEventsAction(settings, restController),
new RestPostCalendarEventAction(settings, restController),
new RestFindFileStructureAction(settings, restController)
new RestFindFileStructureAction(settings, restController),
new RestUpgradeMlAction(settings, restController)
);
}
@ -599,7 +603,8 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
new ActionHandler<>(GetCalendarEventsAction.INSTANCE, TransportGetCalendarEventsAction.class),
new ActionHandler<>(PostCalendarEventsAction.INSTANCE, TransportPostCalendarEventsAction.class),
new ActionHandler<>(PersistJobAction.INSTANCE, TransportPersistJobAction.class),
new ActionHandler<>(FindFileStructureAction.INSTANCE, TransportFindFileStructureAction.class)
new ActionHandler<>(FindFileStructureAction.INSTANCE, TransportFindFileStructureAction.class),
new ActionHandler<>(MlUpgradeAction.INSTANCE, TransportMlUpgradeAction.class)
);
}
@Override

View File

@ -44,7 +44,7 @@ import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
import org.elasticsearch.xpack.ml.utils.ChainTaskExecutor;
import org.elasticsearch.xpack.ml.utils.VoidChainTaskExecutor;
import java.io.IOException;
import java.util.ArrayList;
@ -179,9 +179,9 @@ public class MlConfigMigrator {
}
private void migrateBatches(List<JobsAndDatafeeds> batches, ActionListener<Boolean> listener) {
ChainTaskExecutor chainTaskExecutor = new ChainTaskExecutor(EsExecutors.newDirectExecutorService(), true);
VoidChainTaskExecutor voidChainTaskExecutor = new VoidChainTaskExecutor(EsExecutors.newDirectExecutorService(), true);
for (JobsAndDatafeeds batch : batches) {
chainTaskExecutor.add(chainedListener -> writeConfigToIndex(batch.datafeedConfigs, batch.jobs, ActionListener.wrap(
voidChainTaskExecutor.add(chainedListener -> writeConfigToIndex(batch.datafeedConfigs, batch.jobs, ActionListener.wrap(
failedDocumentIds -> {
List<String> successfulJobWrites = filterFailedJobConfigWrites(failedDocumentIds, batch.jobs);
List<String> successfulDatafeedWrites =
@ -191,7 +191,7 @@ public class MlConfigMigrator {
chainedListener::onFailure
)));
}
chainTaskExecutor.execute(ActionListener.wrap(aVoid -> listener.onResponse(true), listener::onFailure));
voidChainTaskExecutor.execute(ActionListener.wrap(aVoids -> listener.onResponse(true), listener::onFailure));
}
// Exposed for testing

View File

@ -0,0 +1,513 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.ReindexAction;
import org.elasticsearch.index.reindex.ReindexRequest;
import org.elasticsearch.index.reindex.ScrollableHitSource;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xpack.core.ml.action.MlUpgradeAction;
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
import org.elasticsearch.xpack.ml.utils.TypedChainTaskExecutor;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
/**
* ML Job results index upgrade service
*/
public class ResultsIndexUpgradeService {
private static final Logger logger = LogManager.getLogger(ResultsIndexUpgradeService.class);
// Adjust the following constants as necessary for various versions and backports.
private static final int INDEX_VERSION = Version.CURRENT.major;
private static final Version MIN_REQUIRED_VERSION = Version.CURRENT.minimumCompatibilityVersion();
private final IndexNameExpressionResolver indexNameExpressionResolver;
private final Predicate<IndexMetaData> shouldUpgrade;
private final String executor;
/**
* Construct a new upgrade service
*
* @param indexNameExpressionResolver Index expression resolver for the request
* @param executor Where to execute client calls
* @param shouldUpgrade Given IndexMetadata indicate if it should be upgraded or not
* {@code true} indicates that it SHOULD upgrade
*/
public ResultsIndexUpgradeService(IndexNameExpressionResolver indexNameExpressionResolver,
String executor,
Predicate<IndexMetaData> shouldUpgrade) {
this.indexNameExpressionResolver = indexNameExpressionResolver;
this.shouldUpgrade = shouldUpgrade;
this.executor = executor;
}
public static boolean wasIndexCreatedInCurrentMajorVersion(IndexMetaData indexMetaData) {
return indexMetaData.getCreationVersion().major == INDEX_VERSION;
}
/**
* There are two reasons for these indices to exist:
* 1. The upgrade process has ran before and either failed for some reason, or the end user is simply running it again.
* Either way, it should be ok to proceed as this action SHOULD be idempotent,
* unless the shouldUpgrade predicate is poorly formed
* 2. This index was created manually by the user. If the index was created manually and actually needs upgrading, then
* we consider the "new index" to be invalid as the passed predicate indicates that it still needs upgrading.
*
* @param metaData Cluster metadata
* @param newIndexName The index to check
* @param shouldUpgrade Should be index be upgraded
* @return {@code true} if the "new index" is valid
*/
private static boolean validNewIndex(MetaData metaData, String newIndexName, Predicate<IndexMetaData> shouldUpgrade) {
return (metaData.hasIndex(newIndexName) && shouldUpgrade.test(metaData.index(newIndexName))) == false;
}
private static void validateMinNodeVersion(ClusterState clusterState) {
if (clusterState.nodes().getMinNodeVersion().before(MIN_REQUIRED_VERSION)) {
throw new IllegalStateException("All nodes should have at least version [" + MIN_REQUIRED_VERSION + "] to upgrade");
}
}
// This method copies the behavior of the normal {index}/_upgrade rest response handler
private static Tuple<RestStatus, Throwable> getStatusAndCause(BulkByScrollResponse response) {
/*
* Return the highest numbered rest status under the assumption that higher numbered statuses are "more error"
* and thus more interesting to the user.
*/
RestStatus status = RestStatus.OK;
Throwable cause = null;
if (response.isTimedOut()) {
status = RestStatus.REQUEST_TIMEOUT;
cause = new ElasticsearchTimeoutException("Reindex request timed out");
}
for (BulkItemResponse.Failure failure : response.getBulkFailures()) {
if (failure.getStatus().getStatus() > status.getStatus()) {
status = failure.getStatus();
cause = failure.getCause();
}
}
for (ScrollableHitSource.SearchFailure failure : response.getSearchFailures()) {
RestStatus failureStatus = ExceptionsHelper.status(failure.getReason());
if (failureStatus.getStatus() > status.getStatus()) {
status = failureStatus;
cause = failure.getReason();
}
}
return new Tuple<>(status, cause);
}
/**
* Upgrade the indices given in the request.
*
* @param client The client to use when making calls
* @param request The upgrade request
* @param state The current cluster state
* @param listener The listener to alert when actions have completed
*/
public void upgrade(Client client, MlUpgradeAction.Request request, ClusterState state,
ActionListener<AcknowledgedResponse> listener) {
try {
validateMinNodeVersion(state);
String[] concreteIndices = indexNameExpressionResolver.concreteIndexNames(state, request.indicesOptions(), request.indices());
MetaData metaData = state.getMetaData();
List<String> indicesToUpgrade = Arrays.stream(concreteIndices)
.filter(indexName -> shouldUpgrade.test(metaData.index(indexName)))
.collect(Collectors.toList());
// All the internal indices are up to date
if (indicesToUpgrade.isEmpty()) {
listener.onResponse(new AcknowledgedResponse(true));
return;
}
IndexNameAndAliasProvider indexNameAndAliasProvider = new IndexNameAndAliasProvider(indicesToUpgrade, metaData);
Exception validationException = indexNameAndAliasProvider.validate(metaData, shouldUpgrade);
if (validationException != null) {
listener.onFailure(validationException);
return;
}
// <7> Now that we have deleted the old indices, we are complete, alert the user
ActionListener<AcknowledgedResponse> deleteIndicesListener = ActionListener.wrap(
listener::onResponse,
error -> {
String msg = "Failed to delete old indices: " + Strings.collectionToCommaDelimitedString(indicesToUpgrade);
logger.error(msg, error);
listener.onFailure(new ElasticsearchException(msg, error));
}
);
// <6> Now that aliases are moved, need to delete the old indices
ActionListener<AcknowledgedResponse> readAliasListener = ActionListener.wrap(
resp -> deleteOldIndices(client, indicesToUpgrade, deleteIndicesListener),
error -> {
String msg = "Failed adjusting aliases from old indices to new.";
logger.error(msg, error);
listener.onFailure(new ElasticsearchException(msg, error));
}
);
// <5> Documents are now reindexed, time to move read aliases
ActionListener<Boolean> reindexListener = ActionListener.wrap(
resp ->
// Need to make indices writable again so that the aliases can be removed from them
removeReadOnlyBlock(client, indicesToUpgrade,
ActionListener.wrap(
rrob -> adjustAliases(client,
indexNameAndAliasProvider.oldIndicesWithReadAliases(),
indexNameAndAliasProvider.newReadIndicesWithReadAliases(),
readAliasListener),
rrobFailure -> {
String msg = "Failed making old indices writable again so that aliases can be moved.";
logger.error(msg, rrobFailure);
listener.onFailure(new ElasticsearchException(msg, rrobFailure));
})
),
error -> {
logger.error("Failed to reindex old read-only indices", error);
removeReadOnlyBlock(client, indicesToUpgrade, ActionListener.wrap(
empty -> listener.onFailure(error),
removeReadOnlyBlockError -> {
String msg = "Failed making old indices read/write again after failing to reindex: " + error.getMessage();
logger.error(msg, removeReadOnlyBlockError);
listener.onFailure(new ElasticsearchException(msg, removeReadOnlyBlockError));
}
));
}
);
// <4> Old indexes are now readOnly, Time to reindex
ActionListener<AcknowledgedResponse> readOnlyListener = ActionListener.wrap(
ack -> reindexOldReadIndicesToNewIndices(client, indexNameAndAliasProvider.needsReindex(), request, reindexListener),
listener::onFailure
);
// <3> Set old indices to readOnly
ActionListener<AcknowledgedResponse> writeAliasesMovedListener = ActionListener.wrap(
resp -> setReadOnlyBlock(client, indicesToUpgrade, readOnlyListener),
listener::onFailure
);
// <2> Move write index alias to new write indices
ActionListener<AcknowledgedResponse> createWriteIndicesAndSetReadAliasListener = ActionListener.wrap(
resp -> adjustAliases(client,
indexNameAndAliasProvider.oldIndicesWithWriteAliases(),
indexNameAndAliasProvider.newWriteIndicesWithWriteAliases(),
writeAliasesMovedListener),
listener::onFailure
);
// <1> Create the new write indices and set the read aliases to include them
createNewWriteIndicesIfNecessary(client, metaData, indexNameAndAliasProvider.newWriteIndices(),
ActionListener.wrap(
indicesCreated -> adjustAliases(client,
Collections.emptyMap(),
indexNameAndAliasProvider.newWriteIndicesWithReadAliases(),
createWriteIndicesAndSetReadAliasListener),
listener::onFailure
));
} catch (Exception e) {
listener.onFailure(e);
}
}
private void createNewWriteIndicesIfNecessary(Client client,
MetaData metaData,
Collection<String> newWriteIndices,
ActionListener<Boolean> createIndexListener) {
TypedChainTaskExecutor<CreateIndexResponse> chainTaskExecutor =
new TypedChainTaskExecutor<>(
client.threadPool().executor(executor),
(createIndexResponse -> true), //We always want to complete all our tasks
(exception ->
// Short circuit execution IF the exception is NOT a ResourceAlreadyExistsException
// This should be rare, as it requires the index to be created between our previous check and this exception
exception instanceof ResourceAlreadyExistsException == false
));
newWriteIndices.forEach((index) -> {
// If the index already exists, don't try and created it
// We have already verified that IF this index exists, that it does not require upgrading
// So, if it was created between that check and this one, we can assume it is the correct version as it was JUST created
if (metaData.hasIndex(index) == false) {
CreateIndexRequest request = new CreateIndexRequest(index);
chainTaskExecutor.add(listener ->
executeAsyncWithOrigin(client.threadPool().getThreadContext(),
ML_ORIGIN,
request,
listener,
client.admin().indices()::create));
}
});
chainTaskExecutor.execute(ActionListener.wrap(
createIndexResponses -> createIndexListener.onResponse(true),
createIndexListener::onFailure
));
}
/**
* Makes the indices readonly if it's not set as a readonly yet
*/
private void setReadOnlyBlock(Client client, List<String> indices, ActionListener<AcknowledgedResponse> listener) {
Settings settings = Settings.builder().put(IndexMetaData.INDEX_READ_ONLY_SETTING.getKey(), true).build();
UpdateSettingsRequest request = new UpdateSettingsRequest(indices.toArray(new String[0]));
request.settings(settings);
executeAsyncWithOrigin(client.threadPool().getThreadContext(),
ML_ORIGIN,
request,
listener,
client.admin().indices()::updateSettings);
}
private void removeReadOnlyBlock(Client client, List<String> indices,
ActionListener<AcknowledgedResponse> listener) {
Settings settings = Settings.builder().put(IndexMetaData.INDEX_READ_ONLY_SETTING.getKey(), false).build();
UpdateSettingsRequest request = new UpdateSettingsRequest(indices.toArray(new String[0]));
request.settings(settings);
executeAsyncWithOrigin(client.threadPool().getThreadContext(),
ML_ORIGIN,
request,
listener,
client.admin().indices()::updateSettings);
}
private void reindexOldReadIndicesToNewIndices(Client client,
Map<String, String> reindexIndices,
MlUpgradeAction.Request request,
ActionListener<Boolean> listener) {
TypedChainTaskExecutor<BulkByScrollResponse> chainTaskExecutor =
new TypedChainTaskExecutor<>(
client.threadPool().executor(executor),
(createIndexResponse) -> { // If there are errors in the reindex, we should stop
Tuple<RestStatus, Throwable> status = getStatusAndCause(createIndexResponse);
return status.v1().equals(RestStatus.OK);
},
(exception -> true)); // Short circuit and call onFailure for any exception
List<String> newIndices = new ArrayList<>(reindexIndices.size());
reindexIndices.forEach((oldIndex, newIndex) -> {
ReindexRequest reindexRequest = new ReindexRequest();
reindexRequest.setSourceBatchSize(request.getReindexBatchSize());
reindexRequest.setSourceIndices(oldIndex);
reindexRequest.setDestIndex(newIndex);
reindexRequest.setSourceDocTypes(ElasticsearchMappings.DOC_TYPE);
reindexRequest.setDestDocType(ElasticsearchMappings.DOC_TYPE);
// Don't worry if these indices already exist, we validated settings.index.created.version earlier
reindexRequest.setAbortOnVersionConflict(false);
// If the document exists already in the new index, don't want to update or overwrite as we are pulling from "old data"
reindexRequest.setDestOpType(DocWriteRequest.OpType.CREATE.getLowercase());
newIndices.add(newIndex);
chainTaskExecutor.add(chainedListener ->
executeAsyncWithOrigin(client,
ML_ORIGIN,
ReindexAction.INSTANCE,
reindexRequest,
chainedListener));
});
chainTaskExecutor.execute(ActionListener.wrap(
bulkScrollingResponses -> {
BulkByScrollResponse response = bulkScrollingResponses.get(bulkScrollingResponses.size() - 1);
Tuple<RestStatus, Throwable> status = getStatusAndCause(response);
if (status.v1().equals(RestStatus.OK)) {
listener.onResponse(true);
} else {
logger.error("Failed to reindex old results indices.", status.v2());
listener.onFailure(new ElasticsearchException("Failed to reindex old results indices.",status.v2()));
}
},
failure -> {
List<String> createdIndices = newIndices.subList(0, chainTaskExecutor.getCollectedResponses().size());
logger.error(
"Failed to reindex all old read indices. Successfully reindexed: [" +
Strings.collectionToCommaDelimitedString(createdIndices) + "]",
failure);
listener.onFailure(failure);
}
));
}
private void deleteOldIndices(Client client,
List<String> oldIndices,
ActionListener<AcknowledgedResponse> deleteIndicesListener) {
DeleteIndexRequest request = new DeleteIndexRequest(oldIndices.toArray(new String[0]));
request.indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);
executeAsyncWithOrigin(client.threadPool().getThreadContext(),
ML_ORIGIN,
request,
deleteIndicesListener,
client.admin().indices()::delete);
}
private void adjustAliases(Client client,
Map<String, List<Alias>> oldAliases,
Map<String, List<Alias>> newAliases,
ActionListener<AcknowledgedResponse> indicesAliasListener) {
IndicesAliasesRequest indicesAliasesRequest = new IndicesAliasesRequest();
oldAliases.forEach((oldIndex, aliases) ->
{
if (aliases.isEmpty() == false) { //if the aliases are empty, that means there are none to remove
indicesAliasesRequest.addAliasAction(IndicesAliasesRequest
.AliasActions
.remove()
.index(oldIndex)
.aliases(aliases.stream().map(Alias::name).toArray(String[]::new)));
}
}
);
newAliases.forEach((newIndex, aliases) ->
aliases.forEach(alias -> {
IndicesAliasesRequest.AliasActions action = IndicesAliasesRequest.AliasActions.add().index(newIndex);
if (alias.filter() != null) {
action.filter(alias.filter());
}
action.alias(alias.name());
indicesAliasesRequest.addAliasAction(action);
})
);
executeAsyncWithOrigin(client.threadPool().getThreadContext(),
ML_ORIGIN,
indicesAliasesRequest,
indicesAliasListener,
client.admin().indices()::aliases);
}
private static class IndexNameAndAliasProvider {
private final List<String> oldIndices;
private final Map<String, List<Alias>> writeAliases = new HashMap<>();
private final Map<String, List<Alias>> readAliases = new HashMap<>();
private IndexNameAndAliasProvider(List<String> oldIndices, MetaData metaData) {
this.oldIndices = oldIndices;
oldIndices.forEach(index -> {
IndexMetaData indexMetaData = metaData.index(index);
List<Alias> writes = new ArrayList<>();
List<Alias> reads = new ArrayList<>();
indexMetaData.getAliases().forEach(aliasCursor -> {
Alias alias = new Alias(aliasCursor.value.alias());
if (aliasCursor.value.filteringRequired()) {
alias.filter(aliasCursor.value.getFilter().string()); //Set the read alias jobId filter
}
if (alias.name().contains(".write-")) {
writes.add(alias);
} else {
reads.add(alias);
}
});
writeAliases.put(index, writes);
readAliases.put(index, reads);
});
}
private Exception validate(MetaData metaData, Predicate<IndexMetaData> shouldUpgrade) {
for (String index : oldIndices) {
String newWriteName = newWriteName(index);
// If the "new" indices exist, either they were created from a previous run of the upgrade process or the end user
if (validNewIndex(metaData, newWriteName, shouldUpgrade) == false) {
return new IllegalStateException("Index [" + newWriteName + "] already exists and is not the current version.");
}
String newReadName = newReadName(index);
if (validNewIndex(metaData, newReadName, shouldUpgrade) == false) {
return new IllegalStateException("Index [" + newReadName + "] already exists and is not the current version.");
}
}
return null;
}
private String newReadName(String oldIndexName) {
return oldIndexName + "-" + INDEX_VERSION + "r";
}
private String newWriteName(String oldIndexName) {
return oldIndexName + "-" + INDEX_VERSION;
}
private List<String> newWriteIndices() {
return oldIndices.stream().map(this::newWriteName).collect(Collectors.toList());
}
private List<Alias> readAliases(String oldIndex) {
return readAliases.get(oldIndex);
}
private List<Alias> writeAliases(String oldIndex) {
return writeAliases.get(oldIndex);
}
private Map<String, List<Alias>> newWriteIndicesWithReadAliases() {
return oldIndices.stream().collect(Collectors.toMap(this::newWriteName, this::readAliases));
}
private Map<String, List<Alias>> oldIndicesWithWriteAliases() {
return writeAliases;
}
private Map<String, List<Alias>> newWriteIndicesWithWriteAliases() {
return oldIndices.stream().collect(Collectors.toMap(this::newWriteName, this::writeAliases));
}
private Map<String, List<Alias>> oldIndicesWithReadAliases() {
return readAliases;
}
private Map<String, List<Alias>> newReadIndicesWithReadAliases() {
return oldIndices.stream().collect(Collectors.toMap(this::newReadName, this::readAliases));
}
private Map<String, String> needsReindex() {
return oldIndices.stream().collect(Collectors.toMap(Function.identity(), this::newReadName));
}
}
}

View File

@ -26,7 +26,7 @@ import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.utils.ChainTaskExecutor;
import org.elasticsearch.xpack.ml.utils.VoidChainTaskExecutor;
import java.util.Collections;
import java.util.Date;
@ -65,7 +65,7 @@ public class TransportFinalizeJobExecutionAction extends TransportMasterNodeActi
String jobIdString = String.join(",", request.getJobIds());
logger.debug("finalizing jobs [{}]", jobIdString);
ChainTaskExecutor chainTaskExecutor = new ChainTaskExecutor(threadPool.executor(
VoidChainTaskExecutor voidChainTaskExecutor = new VoidChainTaskExecutor(threadPool.executor(
MachineLearning.UTILITY_THREAD_POOL_NAME), true);
Map<String, Object> update = Collections.singletonMap(Job.FINISHED_TIME.getPreferredName(), new Date());
@ -77,7 +77,7 @@ public class TransportFinalizeJobExecutionAction extends TransportMasterNodeActi
updateRequest.doc(update);
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
chainTaskExecutor.add(chainedListener -> {
voidChainTaskExecutor.add(chainedListener -> {
executeAsyncWithOrigin(client, ML_ORIGIN, UpdateAction.INSTANCE, updateRequest, ActionListener.wrap(
updateResponse -> chainedListener.onResponse(null),
chainedListener::onFailure
@ -85,8 +85,8 @@ public class TransportFinalizeJobExecutionAction extends TransportMasterNodeActi
});
}
chainTaskExecutor.execute(ActionListener.wrap(
aVoid -> {
voidChainTaskExecutor.execute(ActionListener.wrap(
aVoids -> {
logger.debug("finalized job [{}]", jobIdString);
listener.onResponse(new AcknowledgedResponse(true));
},

View File

@ -0,0 +1,79 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.TransportMasterNodeReadAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.ParentTaskAssigningClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ml.action.MlUpgradeAction;
import org.elasticsearch.xpack.ml.ResultsIndexUpgradeService;
import static org.elasticsearch.xpack.ml.ResultsIndexUpgradeService.wasIndexCreatedInCurrentMajorVersion;
public class TransportMlUpgradeAction
extends TransportMasterNodeReadAction<MlUpgradeAction.Request, AcknowledgedResponse> {
private final Client client;
private final ResultsIndexUpgradeService resultsIndexUpgradeService;
@Inject
public TransportMlUpgradeAction(TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters, Client client,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(MlUpgradeAction.NAME, transportService, clusterService, threadPool,
actionFilters, MlUpgradeAction.Request::new, indexNameExpressionResolver);
this.client = client;
this.resultsIndexUpgradeService = new ResultsIndexUpgradeService(indexNameExpressionResolver,
executor(),
indexMetadata -> wasIndexCreatedInCurrentMajorVersion(indexMetadata) == false);
}
@Override
protected void masterOperation(Task task, MlUpgradeAction.Request request, ClusterState state,
ActionListener<AcknowledgedResponse> listener) {
TaskId taskId = new TaskId(clusterService.localNode().getId(), task.getId());
ParentTaskAssigningClient parentAwareClient = new ParentTaskAssigningClient(client, taskId);
try {
resultsIndexUpgradeService.upgrade(parentAwareClient, request, state, listener);
} catch (Exception e) {
listener.onFailure(e);
}
}
@Override
protected final void masterOperation(MlUpgradeAction.Request request, ClusterState state,
ActionListener<AcknowledgedResponse> listener) {
throw new UnsupportedOperationException("the task parameter is required");
}
@Override
protected String executor() {
return ThreadPool.Names.SAME;
}
@Override
protected AcknowledgedResponse newResponse() {
return new AcknowledgedResponse();
}
@Override
protected ClusterBlockException checkBlock(MlUpgradeAction.Request request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
}
}

View File

@ -55,7 +55,7 @@ import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
import org.elasticsearch.xpack.ml.job.process.autodetect.UpdateParams;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.ml.utils.ChainTaskExecutor;
import org.elasticsearch.xpack.ml.utils.VoidChainTaskExecutor;
import java.io.IOException;
import java.util.ArrayList;
@ -397,16 +397,16 @@ public class JobManager {
}
private void validate(Job job, JobUpdate jobUpdate, ActionListener<Void> handler) {
ChainTaskExecutor chainTaskExecutor = new ChainTaskExecutor(client.threadPool().executor(
VoidChainTaskExecutor voidChainTaskExecutor = new VoidChainTaskExecutor(client.threadPool().executor(
MachineLearning.UTILITY_THREAD_POOL_NAME), true);
validateModelSnapshotIdUpdate(job, jobUpdate.getModelSnapshotId(), chainTaskExecutor);
validateAnalysisLimitsUpdate(job, jobUpdate.getAnalysisLimits(), chainTaskExecutor);
chainTaskExecutor.execute(handler);
validateModelSnapshotIdUpdate(job, jobUpdate.getModelSnapshotId(), voidChainTaskExecutor);
validateAnalysisLimitsUpdate(job, jobUpdate.getAnalysisLimits(), voidChainTaskExecutor);
voidChainTaskExecutor.execute(ActionListener.wrap(aVoids -> handler.onResponse(null), handler::onFailure));
}
private void validateModelSnapshotIdUpdate(Job job, String modelSnapshotId, ChainTaskExecutor chainTaskExecutor) {
private void validateModelSnapshotIdUpdate(Job job, String modelSnapshotId, VoidChainTaskExecutor voidChainTaskExecutor) {
if (modelSnapshotId != null) {
chainTaskExecutor.add(listener -> {
voidChainTaskExecutor.add(listener -> {
jobResultsProvider.getModelSnapshot(job.getId(), modelSnapshotId, newModelSnapshot -> {
if (newModelSnapshot == null) {
String message = Messages.getMessage(Messages.REST_NO_SUCH_MODEL_SNAPSHOT, modelSnapshotId,
@ -428,12 +428,12 @@ public class JobManager {
}
}
private void validateAnalysisLimitsUpdate(Job job, AnalysisLimits newLimits, ChainTaskExecutor chainTaskExecutor) {
private void validateAnalysisLimitsUpdate(Job job, AnalysisLimits newLimits, VoidChainTaskExecutor voidChainTaskExecutor) {
if (newLimits == null || newLimits.getModelMemoryLimit() == null) {
return;
}
Long newModelMemoryLimit = newLimits.getModelMemoryLimit();
chainTaskExecutor.add(listener -> {
voidChainTaskExecutor.add(listener -> {
if (isJobOpen(clusterService.state(), job.getId())) {
listener.onFailure(ExceptionsHelper.badRequestException("Cannot update " + Job.ANALYSIS_LIMITS.getPreferredName()
+ " while the job is open"));

View File

@ -0,0 +1,76 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.rest.results;
import org.apache.logging.log4j.LogManager;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.tasks.LoggingTaskListener;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.xpack.core.ml.action.MlUpgradeAction;
import org.elasticsearch.xpack.ml.MachineLearning;
import java.io.IOException;
import static org.elasticsearch.rest.RestRequest.Method.POST;
public class RestUpgradeMlAction extends BaseRestHandler {
private static final DeprecationLogger deprecationLogger =
new DeprecationLogger(LogManager.getLogger(RestUpgradeMlAction.class));
public RestUpgradeMlAction(Settings settings, RestController controller) {
super(settings);
controller.registerWithDeprecatedHandler(
POST,
MachineLearning.BASE_PATH + "_upgrade",
this,
POST,
MachineLearning.PRE_V7_BASE_PATH + "_upgrade",
deprecationLogger);
}
@Override
public String getName() {
return "xpack_ml_upgrade_action";
}
@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
MlUpgradeAction.Request parsedRequest = new MlUpgradeAction.Request();
if (restRequest.hasContent()) {
XContentParser parser = restRequest.contentParser();
parsedRequest = MlUpgradeAction.Request.PARSER.apply(parser, null);
}
final MlUpgradeAction.Request upgradeRequest = parsedRequest;
if (restRequest.paramAsBoolean("wait_for_completion", false)) {
return channel -> client.execute(MlUpgradeAction.INSTANCE, upgradeRequest, new RestToXContentListener<>(channel));
} else {
upgradeRequest.setShouldStoreResult(true);
Task task = client.executeLocally(MlUpgradeAction.INSTANCE, upgradeRequest, LoggingTaskListener.instance());
// Send task description id instead of waiting for the message
return channel -> {
try (XContentBuilder builder = channel.newBuilder()) {
builder.startObject();
builder.field("task", client.getLocalNodeId() + ":" + task.getId());
builder.endObject();
channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder));
}
};
}
}
}

View File

@ -1,60 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.utils;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import java.util.LinkedList;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
/**
* A utility that allows chained (serial) execution of a number of tasks
* in async manner.
*/
public class ChainTaskExecutor {
public interface ChainTask {
void run(ActionListener<Void> listener);
}
private final ExecutorService executorService;
private final boolean shortCircuit;
private final LinkedList<ChainTask> tasks = new LinkedList<>();
public ChainTaskExecutor(ExecutorService executorService, boolean shortCircuit) {
this.executorService = Objects.requireNonNull(executorService);
this.shortCircuit = shortCircuit;
}
public synchronized void add(ChainTask task) {
tasks.add(task);
}
public synchronized void execute(ActionListener<Void> listener) {
if (tasks.isEmpty()) {
listener.onResponse(null);
return;
}
ChainTask task = tasks.pop();
executorService.execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
if (shortCircuit) {
listener.onFailure(e);
} else {
execute(listener);
}
}
@Override
protected void doRun() {
task.run(ActionListener.wrap(nullValue -> execute(listener), this::onFailure));
}
});
}
}

View File

@ -0,0 +1,125 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.utils;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.function.Predicate;
/**
* A utility that allows chained (serial) execution of a number of tasks
* in async manner.
*/
public class TypedChainTaskExecutor<T> {
public interface ChainTask <T> {
void run(ActionListener<T> listener);
}
private final ExecutorService executorService;
private final LinkedList<ChainTask<T>> tasks = new LinkedList<>();
private final Predicate<Exception> failureShortCircuitPredicate;
private final Predicate<T> continuationPredicate;
private final List<T> collectedResponses;
/**
* Creates a new TypedChainTaskExecutor.
* Each chainedTask is executed in order serially and after each execution the continuationPredicate is tested.
*
* On failures the failureShortCircuitPredicate is tested.
*
* @param executorService The service where to execute the tasks
* @param continuationPredicate The predicate to test on whether to execute the next task or not.
* {@code true} means continue on to the next task.
* Must be able to handle null values.
* @param failureShortCircuitPredicate The predicate on whether to short circuit execution on a give exception.
* {@code true} means that no more tasks should execute and the the listener::onFailure should be
* called.
*/
public TypedChainTaskExecutor(ExecutorService executorService,
Predicate<T> continuationPredicate,
Predicate<Exception> failureShortCircuitPredicate) {
this.executorService = Objects.requireNonNull(executorService);
this.continuationPredicate = continuationPredicate;
this.failureShortCircuitPredicate = failureShortCircuitPredicate;
this.collectedResponses = new ArrayList<>();
}
public synchronized void add(ChainTask<T> task) {
tasks.add(task);
}
private synchronized void execute(T previousValue, ActionListener<List<T>> listener) {
collectedResponses.add(previousValue);
if (continuationPredicate.test(previousValue)) {
if (tasks.isEmpty()) {
listener.onResponse(Collections.unmodifiableList(new ArrayList<>(collectedResponses)));
return;
}
ChainTask<T> task = tasks.pop();
executorService.execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
if (failureShortCircuitPredicate.test(e)) {
listener.onFailure(e);
} else {
execute(null, listener);
}
}
@Override
protected void doRun() {
task.run(ActionListener.wrap(value -> execute(value, listener), this::onFailure));
}
});
} else {
listener.onResponse(Collections.unmodifiableList(new ArrayList<>(collectedResponses)));
}
}
/**
* Execute all the chained tasks serially, notify listener when completed
*
* @param listener The ActionListener to notify when all executions have been completed,
* or when no further tasks should be executed.
* The resulting list COULD contain null values depending on if execution is continued
* on exceptions or not.
*/
public synchronized void execute(ActionListener<List<T>> listener) {
if (tasks.isEmpty()) {
listener.onResponse(Collections.emptyList());
return;
}
collectedResponses.clear();
ChainTask<T> task = tasks.pop();
executorService.execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
if (failureShortCircuitPredicate.test(e)) {
listener.onFailure(e);
} else {
execute(null, listener);
}
}
@Override
protected void doRun() {
task.run(ActionListener.wrap(value -> execute(value, listener), this::onFailure));
}
});
}
public synchronized List<T> getCollectedResponses() {
return Collections.unmodifiableList(new ArrayList<>(collectedResponses));
}
}

View File

@ -0,0 +1,26 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.utils;
import java.util.concurrent.ExecutorService;
import java.util.function.Predicate;
/**
* A utility that allows chained (serial) execution of a number of tasks
* in async manner.
*/
public class VoidChainTaskExecutor extends TypedChainTaskExecutor<Void> {
public VoidChainTaskExecutor(ExecutorService executorService, boolean shortCircuit) {
this(executorService, (a) -> true, (e) -> shortCircuit);
}
VoidChainTaskExecutor(ExecutorService executorService,
Predicate<Void> continuationPredicate,
Predicate<Exception> failureShortCircuitPredicate) {
super(executorService, continuationPredicate, failureShortCircuitPredicate);
}
}

View File

@ -19,7 +19,7 @@ import java.util.function.Consumer;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
public class ChainTaskExecutorTests extends ESTestCase {
public class VoidChainTaskExecutorTests extends ESTestCase {
private final ThreadPool threadPool = new TestThreadPool(getClass().getName());
private final CountDownLatch latch = new CountDownLatch(1);
@ -36,18 +36,18 @@ public class ChainTaskExecutorTests extends ESTestCase {
public void testExecute() throws InterruptedException {
final List<String> strings = new ArrayList<>();
ActionListener<Void> finalListener = createBlockingListener(() -> strings.add("last"), e -> fail());
ChainTaskExecutor chainTaskExecutor = new ChainTaskExecutor(threadPool.generic(), false);
chainTaskExecutor.add(listener -> {
ActionListener<List<Void>> finalListener = createBlockingListener(() -> strings.add("last"), e -> fail());
VoidChainTaskExecutor voidChainTaskExecutor = new VoidChainTaskExecutor(threadPool.generic(), false);
voidChainTaskExecutor.add(listener -> {
strings.add("first");
listener.onResponse(null);
});
chainTaskExecutor.add(listener -> {
voidChainTaskExecutor.add(listener -> {
strings.add("second");
listener.onResponse(null);
});
chainTaskExecutor.execute(finalListener);
voidChainTaskExecutor.execute(finalListener);
latch.await();
@ -56,22 +56,22 @@ public class ChainTaskExecutorTests extends ESTestCase {
public void testExecute_GivenSingleFailureAndShortCircuit() throws InterruptedException {
final List<String> strings = new ArrayList<>();
ActionListener<Void> finalListener = createBlockingListener(() -> fail(),
ActionListener<List<Void>> finalListener = createBlockingListener(() -> fail(),
e -> assertThat(e.getMessage(), equalTo("some error")));
ChainTaskExecutor chainTaskExecutor = new ChainTaskExecutor(threadPool.generic(), true);
chainTaskExecutor.add(listener -> {
VoidChainTaskExecutor voidChainTaskExecutor = new VoidChainTaskExecutor(threadPool.generic(), true);
voidChainTaskExecutor.add(listener -> {
strings.add("before");
listener.onResponse(null);
});
chainTaskExecutor.add(listener -> {
voidChainTaskExecutor.add(listener -> {
throw new RuntimeException("some error");
});
chainTaskExecutor.add(listener -> {
voidChainTaskExecutor.add(listener -> {
strings.add("after");
listener.onResponse(null);
});
chainTaskExecutor.execute(finalListener);
voidChainTaskExecutor.execute(finalListener);
latch.await();
@ -80,21 +80,21 @@ public class ChainTaskExecutorTests extends ESTestCase {
public void testExecute_GivenMultipleFailuresAndShortCircuit() throws InterruptedException {
final List<String> strings = new ArrayList<>();
ActionListener<Void> finalListener = createBlockingListener(() -> fail(),
ActionListener<List<Void>> finalListener = createBlockingListener(() -> fail(),
e -> assertThat(e.getMessage(), equalTo("some error 1")));
ChainTaskExecutor chainTaskExecutor = new ChainTaskExecutor(threadPool.generic(), true);
chainTaskExecutor.add(listener -> {
VoidChainTaskExecutor voidChainTaskExecutor = new VoidChainTaskExecutor(threadPool.generic(), true);
voidChainTaskExecutor.add(listener -> {
strings.add("before");
listener.onResponse(null);
});
chainTaskExecutor.add(listener -> {
voidChainTaskExecutor.add(listener -> {
throw new RuntimeException("some error 1");
});
chainTaskExecutor.add(listener -> {
voidChainTaskExecutor.add(listener -> {
throw new RuntimeException("some error 2");
});
chainTaskExecutor.execute(finalListener);
voidChainTaskExecutor.execute(finalListener);
latch.await();
@ -103,21 +103,21 @@ public class ChainTaskExecutorTests extends ESTestCase {
public void testExecute_GivenFailureAndNoShortCircuit() throws InterruptedException {
final List<String> strings = new ArrayList<>();
ActionListener<Void> finalListener = createBlockingListener(() -> strings.add("last"), e -> fail());
ChainTaskExecutor chainTaskExecutor = new ChainTaskExecutor(threadPool.generic(), false);
chainTaskExecutor.add(listener -> {
ActionListener<List<Void>> finalListener = createBlockingListener(() -> strings.add("last"), e -> fail());
VoidChainTaskExecutor voidChainTaskExecutor = new VoidChainTaskExecutor(threadPool.generic(), false);
voidChainTaskExecutor.add(listener -> {
strings.add("before");
listener.onResponse(null);
});
chainTaskExecutor.add(listener -> {
voidChainTaskExecutor.add(listener -> {
throw new RuntimeException("some error");
});
chainTaskExecutor.add(listener -> {
voidChainTaskExecutor.add(listener -> {
strings.add("after");
listener.onResponse(null);
});
chainTaskExecutor.execute(finalListener);
voidChainTaskExecutor.execute(finalListener);
latch.await();
@ -126,17 +126,17 @@ public class ChainTaskExecutorTests extends ESTestCase {
public void testExecute_GivenNoTasksAdded() throws InterruptedException {
final List<String> strings = new ArrayList<>();
ActionListener<Void> finalListener = createBlockingListener(() -> strings.add("last"), e -> fail());
ChainTaskExecutor chainTaskExecutor = new ChainTaskExecutor(threadPool.generic(), false);
ActionListener<List<Void>> finalListener = createBlockingListener(() -> strings.add("last"), e -> fail());
VoidChainTaskExecutor voidChainTaskExecutor = new VoidChainTaskExecutor(threadPool.generic(), false);
chainTaskExecutor.execute(finalListener);
voidChainTaskExecutor.execute(finalListener);
latch.await();
assertThat(strings, contains("last"));
}
private ActionListener<Void> createBlockingListener(Runnable runnable, Consumer<Exception> errorHandler) {
private ActionListener<List<Void>> createBlockingListener(Runnable runnable, Consumer<Exception> errorHandler) {
return ActionListener.wrap(nullValue -> {
runnable.run();
latch.countDown();
@ -145,4 +145,4 @@ public class ChainTaskExecutorTests extends ESTestCase {
latch.countDown();
});
}
}
}

View File

@ -0,0 +1,21 @@
{
"ml.upgrade": {
"documentation": "TODO",
"methods": [ "POST" ],
"url": {
"path": "/_ml/_upgrade",
"paths": [ "/_ml/_upgrade" ],
"params": {
"wait_for_completion": {
"type": "boolean",
"description": "Should this request wait until the operation has completed before returning",
"default": false
}
}
},
"body": {
"description" : "Upgrade options",
"required" : false
}
}
}

View File

@ -0,0 +1,70 @@
setup:
- skip:
features: headers
- do:
headers:
Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser
ml.put_job:
job_id: jobs-upgrade-results
body: >
{
"analysis_config" : {
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
},
"data_description" : {
"format":"xcontent",
"time_field":"time"
}
}
- do:
headers:
Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser
Content-Type: application/json
index:
index: .ml-anomalies-jobs-upgrade-results
type: doc
id: "jobs-upgrade-results_1464739200000_1"
body:
{
"job_id": "jobs-upgrade-results",
"result_type": "bucket",
"timestamp": "2016-06-01T00:00:00Z",
"anomaly_score": 90.0,
"bucket_span":1
}
- do:
headers:
Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser
indices.refresh:
index: .ml-anomalies-jobs-upgrade-results
---
"Upgrade results when there is nothing to upgrade":
- do:
ml.upgrade:
wait_for_completion: true
- match: { acknowledged: true }
- do:
indices.exists:
index: .ml-anomalies-shared
- is_true: ''
---
"Upgrade results when there is nothing to upgrade not waiting for results":
- do:
ml.upgrade:
wait_for_completion: false
- match: {task: '/.+:\d+/'}
- set: {task: task}
- do:
tasks.get:
wait_for_completion: true
task_id: $task
- match: {completed: true}
- match: {response.acknowledged: true}

View File

@ -0,0 +1,11 @@
---
"Verify jobs exist":
- do:
ml.get_jobs:
job_id: old-cluster-job-to-upgrade
- match: { count: 1 }
- do:
ml.get_jobs:
job_id: old-cluster-job-to-upgrade-custom
- match: { count: 1 }

View File

@ -0,0 +1,120 @@
---
"Put job on the old cluster and post some data":
- do:
ml.put_job:
job_id: old-cluster-job-to-upgrade
body: >
{
"description":"Cluster upgrade",
"analysis_config" : {
"bucket_span": "60s",
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
},
"analysis_limits" : {
"model_memory_limit": "50mb"
},
"data_description" : {
"format":"xcontent",
"time_field":"time",
"time_format":"epoch"
}
}
- match: { job_id: old-cluster-job-to-upgrade }
- do:
ml.open_job:
job_id: old-cluster-job-to-upgrade
- do:
ml.post_data:
job_id: old-cluster-job-to-upgrade
body:
- airline: AAL
responsetime: 132.2046
sourcetype: post-data-job
time: 1403481600
- airline: JZA
responsetime: 990.4628
sourcetype: post-data-job
time: 1403481700
- match: { processed_record_count: 2 }
- do:
ml.close_job:
job_id: old-cluster-job-to-upgrade
- do:
ml.get_buckets:
job_id: old-cluster-job-to-upgrade
- match: { count: 1 }
# Wait for indices to be fully allocated before
# killing the node
- do:
cluster.health:
index: [".ml-state", ".ml-anomalies-shared"]
wait_for_status: green
---
"Put job on the old cluster with a custom index":
- do:
ml.put_job:
job_id: old-cluster-job-to-upgrade-custom
body: >
{
"description":"Cluster upgrade",
"analysis_config" : {
"bucket_span": "60s",
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
},
"analysis_limits" : {
"model_memory_limit": "50mb"
},
"data_description" : {
"format":"xcontent",
"time_field":"time",
"time_format":"epoch"
},
"results_index_name": "old-cluster-job-to-upgrade-custom"
}
- match: { job_id: old-cluster-job-to-upgrade-custom }
- do:
ml.open_job:
job_id: old-cluster-job-to-upgrade-custom
- do:
ml.post_data:
job_id: old-cluster-job-to-upgrade-custom
body:
- airline: AAL
responsetime: 132.2046
sourcetype: post-data-job
time: 1403481600
- airline: JZA
responsetime: 990.4628
sourcetype: post-data-job
time: 1403481700
- airline: JZA
responsetime: 423.0000
sourcetype: post-data-job
time: 1403481800
- match: { processed_record_count: 3 }
- do:
ml.close_job:
job_id: old-cluster-job-to-upgrade-custom
- do:
ml.get_buckets:
job_id: old-cluster-job-to-upgrade-custom
- match: { count: 3 }
# Wait for indices to be fully allocated before
# killing the node
- do:
cluster.health:
index: [".ml-state", ".ml-anomalies-old-cluster-job-to-upgrade-custom"]
wait_for_status: green

View File

@ -0,0 +1,158 @@
---
"Migrate results data to latest index binary version":
# Verify that all the results are there and the typical indices exist
- do:
ml.get_buckets:
job_id: old-cluster-job-to-upgrade
- match: { count: 1 }
- do:
ml.get_buckets:
job_id: old-cluster-job-to-upgrade-custom
- match: { count: 3 }
- do:
indices.exists:
index: .ml-anomalies-shared
- is_true: ''
- do:
indices.get_settings:
index: .ml-anomalies-shared
name: index.version.created
- match: { \.ml-anomalies-shared.settings.index.version.created: '/6\d+/' }
- do:
indices.exists:
index: .ml-anomalies-custom-old-cluster-job-to-upgrade-custom
- is_true: ''
# Do the upgrade
- do:
ml.upgrade:
wait_for_completion: true
- match: { acknowledged: true }
# Verify that old indices are gone
- do:
indices.exists:
index: .ml-anomalies-shared
- is_false: ''
- do:
indices.exists:
index: .ml-anomalies-custom-old-cluster-job-to-upgrade-custom
- is_false: ''
# Verify that results can still be retrieved
- do:
indices.refresh: {}
- do:
ml.get_buckets:
job_id: old-cluster-job-to-upgrade
- match: { count: 1 }
- do:
ml.get_buckets:
job_id: old-cluster-job-to-upgrade-custom
- match: { count: 3 }
# Verify the created version is correct
- do:
indices.get_settings:
index: .ml-anomalies-old-cluster-job-to-upgrade
name: index.version.created
- match: { \.ml-anomalies-shared-7.settings.index.version.created: '/7\d+/' }
- match: { \.ml-anomalies-shared-7r.settings.index.version.created: '/7\d+/' }
- do:
indices.get_settings:
index: .ml-anomalies-old-cluster-job-to-upgrade-custom
name: index.version.created
- match: { \.ml-anomalies-custom-old-cluster-job-to-upgrade-custom-7.settings.index.version.created: '/7\d+/' }
- match: { \.ml-anomalies-custom-old-cluster-job-to-upgrade-custom-7r.settings.index.version.created: '/7\d+/' }
# Create a new job to verify that the .ml-anomalies-shared index gets created again without issues
- do:
ml.put_job:
job_id: upgraded-cluster-job-should-not-upgrade
body: >
{
"description":"Cluster upgrade",
"analysis_config" : {
"bucket_span": "60s",
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
},
"analysis_limits" : {
"model_memory_limit": "50mb"
},
"data_description" : {
"format":"xcontent",
"time_field":"time",
"time_format":"epoch"
}
}
- match: { job_id: upgraded-cluster-job-should-not-upgrade }
- do:
ml.open_job:
job_id: upgraded-cluster-job-should-not-upgrade
- do:
ml.post_data:
job_id: upgraded-cluster-job-should-not-upgrade
body:
- airline: AAL
responsetime: 132.2046
sourcetype: post-data-job
time: 1403481600
- airline: JZA
responsetime: 990.4628
sourcetype: post-data-job
time: 1403481700
- match: { processed_record_count: 2 }
- do:
ml.close_job:
job_id: upgraded-cluster-job-should-not-upgrade
- do:
ml.get_buckets:
job_id: upgraded-cluster-job-should-not-upgrade
- match: { count: 1 }
- do:
indices.exists:
index: .ml-anomalies-shared
- is_true: ''
- do:
indices.get_settings:
index: .ml-anomalies-shared
name: index.version.created
- match: { \.ml-anomalies-shared.settings.index.version.created: '/7\d+/' }
# Do the upgrade Again as nothing needs upgraded now
- do:
ml.upgrade:
wait_for_completion: true
- match: { acknowledged: true }
- do:
indices.exists:
index: .ml-anomalies-shared
- is_true: ''