ML: Adds set_upgrade_mode API endpoint (#37837)

* ML: Add MlMetadata.upgrade_mode and API

* Adding tests

* Adding wait conditionals for the upgrade_mode call to return

* Adding tests

* adjusting format and tests

* Adjusting wait conditions for api return and msgs

* adjusting doc tests

* adding upgrade mode tests to black list
This commit is contained in:
Benjamin Trent 2019-01-28 09:07:30 -06:00 committed by GitHub
parent e401ab1724
commit 7e4c0e6991
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 1112 additions and 70 deletions

View File

@ -54,6 +54,7 @@ This is a possible response:
"scroll_size" : 1000
}
},
"upgrade_mode": false,
"limits" : { }
}
----

View File

@ -108,6 +108,7 @@ import org.elasticsearch.xpack.core.ml.action.PutDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.PutFilterAction;
import org.elasticsearch.xpack.core.ml.action.PutJobAction;
import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction;
import org.elasticsearch.xpack.core.ml.action.SetUpgradeModeAction;
import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.UpdateCalendarJobAction;
@ -291,6 +292,7 @@ public class XPackClientPlugin extends Plugin implements ActionPlugin, NetworkPl
PostCalendarEventsAction.INSTANCE,
PersistJobAction.INSTANCE,
FindFileStructureAction.INSTANCE,
SetUpgradeModeAction.INSTANCE,
// security
ClearRealmCacheAction.INSTANCE,
ClearRolesCacheAction.INSTANCE,

View File

@ -48,8 +48,9 @@ public class MlMetadata implements XPackPlugin.XPackMetaDataCustom {
public static final String TYPE = "ml";
private static final ParseField JOBS_FIELD = new ParseField("jobs");
private static final ParseField DATAFEEDS_FIELD = new ParseField("datafeeds");
public static final ParseField UPGRADE_MODE = new ParseField("upgrade_mode");
public static final MlMetadata EMPTY_METADATA = new MlMetadata(Collections.emptySortedMap(), Collections.emptySortedMap());
public static final MlMetadata EMPTY_METADATA = new MlMetadata(Collections.emptySortedMap(), Collections.emptySortedMap(), false);
// This parser follows the pattern that metadata is parsed leniently (to allow for enhancements)
public static final ObjectParser<Builder, Void> LENIENT_PARSER = new ObjectParser<>("ml_metadata", true, Builder::new);
@ -57,16 +58,20 @@ public class MlMetadata implements XPackPlugin.XPackMetaDataCustom {
LENIENT_PARSER.declareObjectArray(Builder::putJobs, (p, c) -> Job.LENIENT_PARSER.apply(p, c).build(), JOBS_FIELD);
LENIENT_PARSER.declareObjectArray(Builder::putDatafeeds,
(p, c) -> DatafeedConfig.LENIENT_PARSER.apply(p, c).build(), DATAFEEDS_FIELD);
LENIENT_PARSER.declareBoolean(Builder::isUpgradeMode, UPGRADE_MODE);
}
private final SortedMap<String, Job> jobs;
private final SortedMap<String, DatafeedConfig> datafeeds;
private final boolean upgradeMode;
private final GroupOrJobLookup groupOrJobLookup;
private MlMetadata(SortedMap<String, Job> jobs, SortedMap<String, DatafeedConfig> datafeeds) {
private MlMetadata(SortedMap<String, Job> jobs, SortedMap<String, DatafeedConfig> datafeeds, boolean upgradeMode) {
this.jobs = Collections.unmodifiableSortedMap(jobs);
this.datafeeds = Collections.unmodifiableSortedMap(datafeeds);
this.groupOrJobLookup = new GroupOrJobLookup(jobs.values());
this.upgradeMode = upgradeMode;
}
public Map<String, Job> getJobs() {
@ -94,6 +99,10 @@ public class MlMetadata implements XPackPlugin.XPackMetaDataCustom {
.expand(expression, allowNoDatafeeds);
}
public boolean isUpgradeMode() {
return upgradeMode;
}
@Override
public Version getMinimalSupportedVersion() {
return Version.V_6_0_0_alpha1;
@ -128,12 +137,20 @@ public class MlMetadata implements XPackPlugin.XPackMetaDataCustom {
}
this.datafeeds = datafeeds;
this.groupOrJobLookup = new GroupOrJobLookup(jobs.values());
if (in.getVersion().onOrAfter(Version.V_7_0_0)) {
this.upgradeMode = in.readBoolean();
} else {
this.upgradeMode = false;
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
writeMap(jobs, out);
writeMap(datafeeds, out);
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
out.writeBoolean(upgradeMode);
}
}
private static <T extends Writeable> void writeMap(Map<String, T> map, StreamOutput out) throws IOException {
@ -150,6 +167,7 @@ public class MlMetadata implements XPackPlugin.XPackMetaDataCustom {
new DelegatingMapParams(Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, "true"), params);
mapValuesToXContent(JOBS_FIELD, jobs, builder, extendedParams);
mapValuesToXContent(DATAFEEDS_FIELD, datafeeds, builder, extendedParams);
builder.field(UPGRADE_MODE.getPreferredName(), upgradeMode);
return builder;
}
@ -170,10 +188,12 @@ public class MlMetadata implements XPackPlugin.XPackMetaDataCustom {
final Diff<Map<String, Job>> jobs;
final Diff<Map<String, DatafeedConfig>> datafeeds;
final boolean upgradeMode;
MlMetadataDiff(MlMetadata before, MlMetadata after) {
this.jobs = DiffableUtils.diff(before.jobs, after.jobs, DiffableUtils.getStringKeySerializer());
this.datafeeds = DiffableUtils.diff(before.datafeeds, after.datafeeds, DiffableUtils.getStringKeySerializer());
this.upgradeMode = after.upgradeMode;
}
public MlMetadataDiff(StreamInput in) throws IOException {
@ -181,6 +201,11 @@ public class MlMetadata implements XPackPlugin.XPackMetaDataCustom {
MlMetadataDiff::readJobDiffFrom);
this.datafeeds = DiffableUtils.readJdkMapDiff(in, DiffableUtils.getStringKeySerializer(), DatafeedConfig::new,
MlMetadataDiff::readDatafeedDiffFrom);
if (in.getVersion().onOrAfter(Version.V_7_0_0)) {
upgradeMode = in.readBoolean();
} else {
upgradeMode = false;
}
}
/**
@ -192,13 +217,16 @@ public class MlMetadata implements XPackPlugin.XPackMetaDataCustom {
public MetaData.Custom apply(MetaData.Custom part) {
TreeMap<String, Job> newJobs = new TreeMap<>(jobs.apply(((MlMetadata) part).jobs));
TreeMap<String, DatafeedConfig> newDatafeeds = new TreeMap<>(datafeeds.apply(((MlMetadata) part).datafeeds));
return new MlMetadata(newJobs, newDatafeeds);
return new MlMetadata(newJobs, newDatafeeds, upgradeMode);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
jobs.writeTo(out);
datafeeds.writeTo(out);
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
out.writeBoolean(upgradeMode);
}
}
@Override
@ -223,7 +251,8 @@ public class MlMetadata implements XPackPlugin.XPackMetaDataCustom {
return false;
MlMetadata that = (MlMetadata) o;
return Objects.equals(jobs, that.jobs) &&
Objects.equals(datafeeds, that.datafeeds);
Objects.equals(datafeeds, that.datafeeds) &&
Objects.equals(upgradeMode, that.upgradeMode);
}
@Override
@ -233,13 +262,14 @@ public class MlMetadata implements XPackPlugin.XPackMetaDataCustom {
@Override
public int hashCode() {
return Objects.hash(jobs, datafeeds);
return Objects.hash(jobs, datafeeds, upgradeMode);
}
public static class Builder {
private TreeMap<String, Job> jobs;
private TreeMap<String, DatafeedConfig> datafeeds;
private boolean upgradeMode;
public Builder() {
jobs = new TreeMap<>();
@ -253,6 +283,7 @@ public class MlMetadata implements XPackPlugin.XPackMetaDataCustom {
} else {
jobs = new TreeMap<>(previous.jobs);
datafeeds = new TreeMap<>(previous.datafeeds);
upgradeMode = previous.upgradeMode;
}
}
@ -318,8 +349,13 @@ public class MlMetadata implements XPackPlugin.XPackMetaDataCustom {
return this;
}
public Builder isUpgradeMode(boolean upgradeMode) {
this.upgradeMode = upgradeMode;
return this;
}
public MlMetadata build() {
return new MlMetadata(jobs, datafeeds);
return new MlMetadata(jobs, datafeeds, upgradeMode);
}
}

View File

@ -24,8 +24,12 @@ public final class MlTasks {
public static final String JOB_TASK_NAME = "xpack/ml/job";
public static final String DATAFEED_TASK_NAME = "xpack/ml/datafeed";
private static final String JOB_TASK_ID_PREFIX = "job-";
private static final String DATAFEED_TASK_ID_PREFIX = "datafeed-";
public static final String JOB_TASK_ID_PREFIX = "job-";
public static final String DATAFEED_TASK_ID_PREFIX = "datafeed-";
public static final PersistentTasksCustomMetaData.Assignment AWAITING_UPGRADE =
new PersistentTasksCustomMetaData.Assignment(null,
"persistent task cannot be assigned while upgrade mode is enabled.");
private MlTasks() {
}

View File

@ -0,0 +1,115 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.ml.action;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.Objects;
public class SetUpgradeModeAction extends Action<AcknowledgedResponse> {
public static final SetUpgradeModeAction INSTANCE = new SetUpgradeModeAction();
public static final String NAME = "cluster:admin/xpack/ml/upgrade_mode";
private SetUpgradeModeAction() {
super(NAME);
}
@Override
public AcknowledgedResponse newResponse() {
return new AcknowledgedResponse();
}
public static class Request extends AcknowledgedRequest<Request> implements ToXContentObject {
private boolean enabled;
private static final ParseField ENABLED = new ParseField("enabled");
public static final ConstructingObjectParser<Request, Void> PARSER =
new ConstructingObjectParser<>(NAME, a -> new Request((Boolean)a[0]));
static {
PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), ENABLED);
}
public Request(boolean enabled) {
this.enabled = enabled;
}
public Request(StreamInput in) throws IOException {
readFrom(in);
}
public Request() {
}
public boolean isEnabled() {
return enabled;
}
@Override
public ActionRequestValidationException validate() {
return null;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
this.enabled = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(enabled);
}
@Override
public int hashCode() {
return Objects.hash(enabled);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || obj.getClass() != getClass()) {
return false;
}
Request other = (Request) obj;
return Objects.equals(enabled, other.enabled);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(ENABLED.getPreferredName(), enabled);
builder.endObject();
return builder;
}
}
static class RequestBuilder extends ActionRequestBuilder<Request, AcknowledgedResponse> {
RequestBuilder(ElasticsearchClient client, SetUpgradeModeAction action) {
super(client, action, new Request());
}
}
}

View File

@ -0,0 +1,34 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.ml.action;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractSerializingTestCase;
import org.elasticsearch.xpack.core.ml.action.SetUpgradeModeAction.Request;
public class SetUpgradeModeActionRequestTests extends AbstractSerializingTestCase<Request> {
@Override
protected Request createTestInstance() {
return new Request(randomBoolean());
}
@Override
protected boolean supportsUnknownFields() {
return false;
}
@Override
protected Writeable.Reader<Request> instanceReader() {
return Request::new;
}
@Override
protected Request doParseInstance(XContentParser parser) {
return Request.PARSER.apply(parser, null);
}
}

View File

@ -87,6 +87,7 @@ import org.elasticsearch.xpack.core.ml.action.PutDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.PutFilterAction;
import org.elasticsearch.xpack.core.ml.action.PutJobAction;
import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction;
import org.elasticsearch.xpack.core.ml.action.SetUpgradeModeAction;
import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.UpdateCalendarJobAction;
@ -868,6 +869,7 @@ public class ReservedRolesStoreTests extends ESTestCase {
assertThat(role.cluster().check(PutFilterAction.NAME, request), is(true));
assertThat(role.cluster().check(PutJobAction.NAME, request), is(true));
assertThat(role.cluster().check(RevertModelSnapshotAction.NAME, request), is(true));
assertThat(role.cluster().check(SetUpgradeModeAction.NAME, request), is(true));
assertThat(role.cluster().check(StartDatafeedAction.NAME, request), is(true));
assertThat(role.cluster().check(StopDatafeedAction.NAME, request), is(true));
assertThat(role.cluster().check(UpdateCalendarJobAction.NAME, request), is(true));
@ -938,6 +940,7 @@ public class ReservedRolesStoreTests extends ESTestCase {
assertThat(role.cluster().check(PutFilterAction.NAME, request), is(false));
assertThat(role.cluster().check(PutJobAction.NAME, request), is(false));
assertThat(role.cluster().check(RevertModelSnapshotAction.NAME, request), is(false));
assertThat(role.cluster().check(SetUpgradeModeAction.NAME, request), is(false));
assertThat(role.cluster().check(StartDatafeedAction.NAME, request), is(false));
assertThat(role.cluster().check(StopDatafeedAction.NAME, request), is(false));
assertThat(role.cluster().check(UpdateCalendarJobAction.NAME, request), is(false));

View File

@ -93,7 +93,11 @@ integTestRunner {
'ml/validate/Test job config that is invalid only because of the job ID',
'ml/validate_detector/Test invalid detector',
'ml/delete_forecast/Test delete on _all forecasts not allow no forecasts',
'ml/delete_forecast/Test delete forecast on missing forecast'
'ml/delete_forecast/Test delete forecast on missing forecast',
'ml/set_upgrade_mode/Attempt to open job when upgrade_mode is enabled',
'ml/set_upgrade_mode/Setting upgrade_mode to enabled',
'ml/set_upgrade_mode/Setting upgrade mode to disabled from enabled',
'ml/set_upgrade_mode/Test setting upgrade_mode to false when it is already false'
].join(',')
}

View File

@ -0,0 +1,173 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.integration;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction;
import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction;
import org.elasticsearch.xpack.core.ml.action.SetUpgradeModeAction;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
import org.junit.After;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createDatafeed;
import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createScheduledJob;
import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.getDataCounts;
import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.getDatafeedStats;
import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.indexDocs;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.isEmptyString;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
public class SetUpgradeModeIT extends MlNativeAutodetectIntegTestCase {
@After
public void cleanup() throws Exception {
cleanUp();
}
public void testEnableUpgradeMode() throws Exception {
String jobId = "realtime-job-test-enable-upgrade-mode";
String datafeedId = jobId + "-datafeed";
startRealtime(jobId);
// Assert appropriate task state and assignment numbers
assertThat(client().admin()
.cluster()
.prepareListTasks()
.setActions(MlTasks.JOB_TASK_NAME + "[c]", MlTasks.DATAFEED_TASK_NAME + "[c]")
.get()
.getTasks()
.size(), equalTo(2));
ClusterState masterClusterState = client().admin().cluster().prepareState().all().get().getState();
PersistentTasksCustomMetaData persistentTasks = masterClusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
assertThat(persistentTasks.findTasks(MlTasks.DATAFEED_TASK_NAME, task -> true).size(), equalTo(1));
assertThat(persistentTasks.findTasks(MlTasks.JOB_TASK_NAME, task -> true).size(), equalTo(1));
assertThat(MlMetadata.getMlMetadata(masterClusterState).isUpgradeMode(), equalTo(false));
// Set the upgrade mode setting
AcknowledgedResponse response = client().execute(SetUpgradeModeAction.INSTANCE, new SetUpgradeModeAction.Request(true))
.actionGet();
assertThat(response.isAcknowledged(), equalTo(true));
masterClusterState = client().admin().cluster().prepareState().all().get().getState();
// Assert state for tasks still exists and that the upgrade setting is set
persistentTasks = masterClusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
assertThat(persistentTasks.findTasks(MlTasks.DATAFEED_TASK_NAME, task -> true).size(), equalTo(1));
assertThat(persistentTasks.findTasks(MlTasks.JOB_TASK_NAME, task -> true).size(), equalTo(1));
assertThat(MlMetadata.getMlMetadata(masterClusterState).isUpgradeMode(), equalTo(true));
assertThat(client().admin()
.cluster()
.prepareListTasks()
.setActions(MlTasks.JOB_TASK_NAME + "[c]", MlTasks.DATAFEED_TASK_NAME + "[c]")
.get()
.getTasks(), is(empty()));
GetJobsStatsAction.Response.JobStats jobStats = getJobStats(jobId).get(0);
assertThat(jobStats.getState(), equalTo(JobState.OPENED));
assertThat(jobStats.getAssignmentExplanation(), equalTo(MlTasks.AWAITING_UPGRADE.getExplanation()));
assertThat(jobStats.getNode(), is(nullValue()));
GetDatafeedsStatsAction.Response.DatafeedStats datafeedStats = getDatafeedStats(datafeedId);
assertThat(datafeedStats.getDatafeedState(), equalTo(DatafeedState.STARTED));
assertThat(datafeedStats.getAssignmentExplanation(), equalTo(MlTasks.AWAITING_UPGRADE.getExplanation()));
assertThat(datafeedStats.getNode(), is(nullValue()));
Job.Builder job = createScheduledJob("job-should-not-open");
registerJob(job);
putJob(job);
ElasticsearchStatusException statusException = expectThrows(ElasticsearchStatusException.class, () -> openJob(job.getId()));
assertThat(statusException.status(), equalTo(RestStatus.TOO_MANY_REQUESTS));
assertThat(statusException.getMessage(), equalTo("Cannot open jobs when upgrade mode is enabled"));
//Disable the setting
response = client().execute(SetUpgradeModeAction.INSTANCE, new SetUpgradeModeAction.Request(false))
.actionGet();
assertThat(response.isAcknowledged(), equalTo(true));
masterClusterState = client().admin().cluster().prepareState().all().get().getState();
persistentTasks = masterClusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
assertThat(persistentTasks.findTasks(MlTasks.DATAFEED_TASK_NAME, task -> true).size(), equalTo(1));
assertThat(persistentTasks.findTasks(MlTasks.JOB_TASK_NAME, task -> true).size(), equalTo(1));
assertThat(MlMetadata.getMlMetadata(masterClusterState).isUpgradeMode(), equalTo(false));
assertBusy(() -> assertThat(client().admin()
.cluster()
.prepareListTasks()
.setActions(MlTasks.JOB_TASK_NAME + "[c]", MlTasks.DATAFEED_TASK_NAME + "[c]")
.get()
.getTasks()
.size(), equalTo(2)));
jobStats = getJobStats(jobId).get(0);
assertThat(jobStats.getState(), equalTo(JobState.OPENED));
assertThat(jobStats.getAssignmentExplanation(), isEmptyString());
assertThat(jobStats.getNode(), is(not(nullValue())));
datafeedStats = getDatafeedStats(datafeedId);
assertThat(datafeedStats.getDatafeedState(), equalTo(DatafeedState.STARTED));
assertThat(datafeedStats.getAssignmentExplanation(), isEmptyString());
assertThat(datafeedStats.getNode(), is(not(nullValue())));
}
private void startRealtime(String jobId) throws Exception {
client().admin().indices().prepareCreate("data")
.addMapping("type", "time", "type=date")
.get();
long numDocs1 = randomIntBetween(32, 2048);
long now = System.currentTimeMillis();
long lastWeek = now - 604800000;
indexDocs(logger, "data", numDocs1, lastWeek, now);
Job.Builder job = createScheduledJob(jobId);
registerJob(job);
putJob(job);
openJob(job.getId());
assertBusy(() -> assertEquals(getJobStats(job.getId()).get(0).getState(), JobState.OPENED));
DatafeedConfig datafeedConfig = createDatafeed(job.getId() + "-datafeed", job.getId(), Collections.singletonList("data"));
registerDatafeed(datafeedConfig);
putDatafeed(datafeedConfig);
startDatafeed(datafeedConfig.getId(), 0L, null);
assertBusy(() -> {
DataCounts dataCounts = getDataCounts(job.getId());
assertThat(dataCounts.getProcessedRecordCount(), equalTo(numDocs1));
assertThat(dataCounts.getOutOfOrderTimeStampCount(), equalTo(0L));
});
long numDocs2 = randomIntBetween(2, 64);
now = System.currentTimeMillis();
indexDocs(logger, "data", numDocs2, now + 5000, now + 6000);
assertBusy(() -> {
DataCounts dataCounts = getDataCounts(job.getId());
assertThat(dataCounts.getProcessedRecordCount(), equalTo(numDocs1 + numDocs2));
assertThat(dataCounts.getOutOfOrderTimeStampCount(), equalTo(0L));
}, 30, TimeUnit.SECONDS);
}
}

View File

@ -97,6 +97,7 @@ import org.elasticsearch.xpack.core.ml.action.PutDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.PutFilterAction;
import org.elasticsearch.xpack.core.ml.action.PutJobAction;
import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction;
import org.elasticsearch.xpack.core.ml.action.SetUpgradeModeAction;
import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.UpdateCalendarJobAction;
@ -152,6 +153,7 @@ import org.elasticsearch.xpack.ml.action.TransportPutDatafeedAction;
import org.elasticsearch.xpack.ml.action.TransportPutFilterAction;
import org.elasticsearch.xpack.ml.action.TransportPutJobAction;
import org.elasticsearch.xpack.ml.action.TransportRevertModelSnapshotAction;
import org.elasticsearch.xpack.ml.action.TransportSetUpgradeModeAction;
import org.elasticsearch.xpack.ml.action.TransportStartDatafeedAction;
import org.elasticsearch.xpack.ml.action.TransportStopDatafeedAction;
import org.elasticsearch.xpack.ml.action.TransportUpdateCalendarJobAction;
@ -190,6 +192,7 @@ import org.elasticsearch.xpack.ml.process.NativeControllerHolder;
import org.elasticsearch.xpack.ml.rest.RestDeleteExpiredDataAction;
import org.elasticsearch.xpack.ml.rest.RestFindFileStructureAction;
import org.elasticsearch.xpack.ml.rest.RestMlInfoAction;
import org.elasticsearch.xpack.ml.rest.RestSetUpgradeModeAction;
import org.elasticsearch.xpack.ml.rest.calendar.RestDeleteCalendarAction;
import org.elasticsearch.xpack.ml.rest.calendar.RestDeleteCalendarEventAction;
import org.elasticsearch.xpack.ml.rest.calendar.RestDeleteCalendarJobAction;
@ -425,7 +428,7 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME));
AutodetectProcessManager autodetectProcessManager = new AutodetectProcessManager(env, settings, client, threadPool,
jobManager, jobResultsProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory,
normalizerFactory, xContentRegistry, auditor);
normalizerFactory, xContentRegistry, auditor, clusterService);
this.autodetectProcessManager.set(autodetectProcessManager);
DatafeedJobBuilder datafeedJobBuilder = new DatafeedJobBuilder(client, settings, xContentRegistry,
auditor, System::currentTimeMillis);
@ -542,7 +545,8 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
new RestPutCalendarJobAction(settings, restController),
new RestGetCalendarEventsAction(settings, restController),
new RestPostCalendarEventAction(settings, restController),
new RestFindFileStructureAction(settings, restController)
new RestFindFileStructureAction(settings, restController),
new RestSetUpgradeModeAction(settings, restController)
);
}
@ -600,7 +604,8 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
new ActionHandler<>(GetCalendarEventsAction.INSTANCE, TransportGetCalendarEventsAction.class),
new ActionHandler<>(PostCalendarEventsAction.INSTANCE, TransportPostCalendarEventsAction.class),
new ActionHandler<>(PersistJobAction.INSTANCE, TransportPersistJobAction.class),
new ActionHandler<>(FindFileStructureAction.INSTANCE, TransportFindFileStructureAction.class)
new ActionHandler<>(FindFileStructureAction.INSTANCE, TransportFindFileStructureAction.class),
new ActionHandler<>(SetUpgradeModeAction.INSTANCE, TransportSetUpgradeModeAction.class)
);
}
@Override

View File

@ -15,6 +15,7 @@ import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ml.MachineLearningField;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.action.MlInfoAction;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.job.config.AnalysisLimits;
@ -39,6 +40,7 @@ public class TransportMlInfoAction extends HandledTransportAction<MlInfoAction.R
Map<String, Object> info = new HashMap<>();
info.put("defaults", defaults());
info.put("limits", limits());
info.put(MlMetadata.UPGRADE_MODE.getPreferredName(), upgradeMode());
listener.onResponse(new MlInfoAction.Response(info));
}
@ -49,6 +51,10 @@ public class TransportMlInfoAction extends HandledTransportAction<MlInfoAction.R
return defaults;
}
private boolean upgradeMode() {
return MlMetadata.getMlMetadata(clusterService.state()).isUpgradeMode();
}
private Map<String, Object> anomalyDetectorsDefaults() {
Map<String, Object> defaults = new HashMap<>();
defaults.put(AnalysisLimits.MODEL_MEMORY_LIMIT.getPreferredName(), defaultModelMemoryLimit());

View File

@ -40,6 +40,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.XPackField;
import org.elasticsearch.xpack.core.ml.MlMetaIndex;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.FinalizeJobExecutionAction;
import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
@ -66,6 +67,7 @@ import java.util.function.Predicate;
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
import static org.elasticsearch.xpack.core.ml.MlTasks.AWAITING_UPGRADE;
import static org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE;
/*
@ -550,6 +552,7 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
private final AutodetectProcessManager autodetectProcessManager;
private final MlMemoryTracker memoryTracker;
private final Client client;
private final ClusterService clusterService;
private volatile int maxConcurrentJobAllocations;
private volatile int maxMachineMemoryPercent;
@ -566,6 +569,7 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
this.maxConcurrentJobAllocations = MachineLearning.CONCURRENT_JOB_ALLOCATIONS.get(settings);
this.maxMachineMemoryPercent = MachineLearning.MAX_MACHINE_MEMORY_PERCENT.get(settings);
this.maxLazyMLNodes = MachineLearning.MAX_LAZY_ML_NODES.get(settings);
this.clusterService = clusterService;
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(MachineLearning.CONCURRENT_JOB_ALLOCATIONS, this::setMaxConcurrentJobAllocations);
clusterService.getClusterSettings()
@ -583,6 +587,11 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
return AWAITING_MIGRATION;
}
// If we are waiting for an upgrade to complete, we should not assign to a node
if (MlMetadata.getMlMetadata(clusterState).isUpgradeMode()) {
return AWAITING_UPGRADE;
}
PersistentTasksCustomMetaData.Assignment assignment = selectLeastLoadedMlNode(params.getJobId(),
params.getJob(),
clusterState,
@ -613,6 +622,10 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
// If we already know that we can't find an ml node because all ml nodes are running at capacity or
// simply because there are no ml nodes in the cluster then we fail quickly here:
PersistentTasksCustomMetaData.Assignment assignment = getAssignment(params, clusterState);
if (assignment.equals(AWAITING_UPGRADE)) {
throw makeCurrentlyBeingUpgradedException(logger, params.getJobId(), assignment.getExplanation());
}
if (assignment.getExecutorNode() == null && assignment.equals(AWAITING_LAZY_ASSIGNMENT) == false) {
throw makeNoSuitableNodesException(logger, params.getJobId(), assignment.getExplanation());
}
@ -631,14 +644,18 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
}
String jobId = jobTask.getJobId();
autodetectProcessManager.openJob(jobTask, clusterState, e2 -> {
autodetectProcessManager.openJob(jobTask, clusterState, (e2, shouldFinalizeJob) -> {
if (e2 == null) {
FinalizeJobExecutionAction.Request finalizeRequest = new FinalizeJobExecutionAction.Request(new String[]{jobId});
executeAsyncWithOrigin(client, ML_ORIGIN, FinalizeJobExecutionAction.INSTANCE, finalizeRequest,
if (shouldFinalizeJob) {
FinalizeJobExecutionAction.Request finalizeRequest = new FinalizeJobExecutionAction.Request(new String[]{jobId});
executeAsyncWithOrigin(client, ML_ORIGIN, FinalizeJobExecutionAction.INSTANCE, finalizeRequest,
ActionListener.wrap(
response -> task.markAsCompleted(),
e -> logger.error("error finalizing job [" + jobId + "]", e)
response -> task.markAsCompleted(),
e -> logger.error("error finalizing job [" + jobId + "]", e)
));
} else {
task.markAsCompleted();
}
} else {
task.markAsFailed(e2);
}
@ -649,7 +666,7 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
protected AllocatedPersistentTask createTask(long id, String type, String action, TaskId parentTaskId,
PersistentTasksCustomMetaData.PersistentTask<OpenJobAction.JobParams> persistentTask,
Map<String, String> headers) {
return new JobTask(persistentTask.getParams().getJobId(), id, type, action, parentTaskId, headers);
return new JobTask(persistentTask.getParams().getJobId(), id, type, action, parentTaskId, headers);
}
void setMaxConcurrentJobAllocations(int maxConcurrentJobAllocations) {
@ -701,7 +718,6 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
void closeJob(String reason) {
autodetectProcessManager.closeJob(this, false, reason);
}
}
/**
@ -772,4 +788,10 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
return new ElasticsearchStatusException("Could not open job because no ML nodes with sufficient capacity were found",
RestStatus.TOO_MANY_REQUESTS, detail);
}
static ElasticsearchException makeCurrentlyBeingUpgradedException(Logger logger, String jobId, String explanation) {
String msg = "Cannot open jobs when upgrade mode is enabled";
logger.warn("[{}] {}", jobId, msg);
return new ElasticsearchStatusException(msg, RestStatus.TOO_MANY_REQUESTS);
}
}

View File

@ -0,0 +1,271 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.persistent.PersistentTasksClusterService;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
import org.elasticsearch.persistent.PersistentTasksService;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.IsolateDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.SetUpgradeModeAction;
import org.elasticsearch.xpack.ml.utils.TypedChainTaskExecutor;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
import static org.elasticsearch.xpack.core.ml.MlTasks.AWAITING_UPGRADE;
import static org.elasticsearch.xpack.core.ml.MlTasks.DATAFEED_TASK_NAME;
import static org.elasticsearch.xpack.core.ml.MlTasks.JOB_TASK_NAME;
public class TransportSetUpgradeModeAction extends TransportMasterNodeAction<SetUpgradeModeAction.Request, AcknowledgedResponse> {
private final AtomicBoolean isRunning = new AtomicBoolean(false);
private final PersistentTasksClusterService persistentTasksClusterService;
private final PersistentTasksService persistentTasksService;
private final ClusterService clusterService;
private final Client client;
@Inject
public TransportSetUpgradeModeAction(TransportService transportService, ThreadPool threadPool, ClusterService clusterService,
PersistentTasksClusterService persistentTasksClusterService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, Client client,
PersistentTasksService persistentTasksService) {
super(SetUpgradeModeAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver,
SetUpgradeModeAction.Request::new);
this.persistentTasksClusterService = persistentTasksClusterService;
this.clusterService = clusterService;
this.client = client;
this.persistentTasksService = persistentTasksService;
}
@Override
protected String executor() {
return ThreadPool.Names.SAME;
}
@Override
protected AcknowledgedResponse newResponse() {
return new AcknowledgedResponse();
}
@Override
protected void masterOperation(SetUpgradeModeAction.Request request, ClusterState state, ActionListener<AcknowledgedResponse> listener)
throws Exception {
// Don't want folks spamming this endpoint while it is in progress, only allow one request to be handled at a time
if (isRunning.compareAndSet(false, true) == false) {
String msg = "Attempted to set [upgrade_mode] to [" +
request.isEnabled() + "] from [" + MlMetadata.getMlMetadata(state).isUpgradeMode() +
"] while previous request was processing.";
Exception detail = new IllegalStateException(msg);
listener.onFailure(new ElasticsearchStatusException(
"Cannot change [upgrade_mode]. Previous request is still being processed.",
RestStatus.TOO_MANY_REQUESTS,
detail));
return;
}
// Noop, nothing for us to do, simply return fast to the caller
if (request.isEnabled() == MlMetadata.getMlMetadata(state).isUpgradeMode()) {
isRunning.set(false);
listener.onResponse(new AcknowledgedResponse(true));
return;
}
ActionListener<AcknowledgedResponse> wrappedListener = ActionListener.wrap(
r -> {
isRunning.set(false);
listener.onResponse(r);
},
e -> {
isRunning.set(false);
listener.onFailure(e);
}
);
final PersistentTasksCustomMetaData tasksCustomMetaData = state.metaData().custom(PersistentTasksCustomMetaData.TYPE);
// <4> We have unassigned the tasks, respond to the listener.
ActionListener<List<PersistentTask<?>>> unassignPersistentTasksListener = ActionListener.wrap(
unassigndPersistentTasks -> {
// Wait for our tasks to all stop
client.admin()
.cluster()
.prepareListTasks()
.setActions(DATAFEED_TASK_NAME + "[c]", JOB_TASK_NAME + "[c]")
.setWaitForCompletion(true)
.setTimeout(request.timeout()).execute(ActionListener.wrap(
r -> wrappedListener.onResponse(new AcknowledgedResponse(true)),
wrappedListener::onFailure));
},
wrappedListener::onFailure
);
// <3> After isolating the datafeeds, unassign the tasks
ActionListener<List<IsolateDatafeedAction.Response>> isolateDatafeedListener = ActionListener.wrap(
isolatedDatafeeds -> unassignPersistentTasks(tasksCustomMetaData, unassignPersistentTasksListener),
wrappedListener::onFailure
);
/*
<2> Handle the cluster response and act accordingly
<.1>
If we are enabling the option, we need to isolate the datafeeds so we can unassign the ML Jobs
</.1>
<.2>
If we are disabling the option, we need to wait to make sure all the job and datafeed tasks no longer have the upgrade mode
assignment
We make no guarantees around which tasks will be running again once upgrade_mode is disabled.
Scenario:
* Before `upgrade_mode: true`, there were unassigned tasks because node task assignment was maxed out (tasks A, B)
* There were assigned tasks executing fine (tasks C, D)
* While `upgrade_mode: true` all are attempting to be re-assigned, but cannot and end up with the AWAITING_UPGRADE reason
* `upgrade_mode: false` opens the flood gates, all tasks are still attempting to re-assign
* A or B could be re-assigned before either C or D. Thus, previously erred tasks are now executing fine, and previously
executing tasks are now unassigned due to resource exhaustion.
We make no promises which tasks are executing if resources of the cluster are exhausted.
</.2>
</2>
*/
ActionListener<AcknowledgedResponse> clusterStateUpdateListener = ActionListener.wrap(
acknowledgedResponse -> {
// State change was not acknowledged, we either timed out or ran into some exception
// We should not continue and alert failure to the end user
if (acknowledgedResponse.isAcknowledged() == false) {
wrappedListener.onFailure(new ElasticsearchTimeoutException("Unknown error occurred while updating cluster state"));
return;
}
// Did we change from disabled -> enabled?
if (request.isEnabled()) {
isolateDatafeeds(tasksCustomMetaData, isolateDatafeedListener);
} else {
persistentTasksService.waitForPersistentTasksCondition(
(persistentTasksCustomMetaData) ->
// Wait for jobs to not be "Awaiting upgrade"
persistentTasksCustomMetaData.findTasks(JOB_TASK_NAME,
(t) -> t.getAssignment().equals(AWAITING_UPGRADE))
.isEmpty() &&
// Datafeeds to wait for a non-"Awaiting upgrade" assignment and for the job task allocations to converge
// If we do not wait, deleting datafeeds, or attempting to unallocate them again causes issues as the
// job's task allocationId could have changed during either process.
persistentTasksCustomMetaData.findTasks(DATAFEED_TASK_NAME,
(t) ->
t.getAssignment().equals(AWAITING_UPGRADE) ||
t.getAssignment().getExplanation().contains("state is stale"))
.isEmpty(),
request.timeout(),
ActionListener.wrap(r -> wrappedListener.onResponse(new AcknowledgedResponse(true)), wrappedListener::onFailure)
);
}
},
wrappedListener::onFailure
);
//<1> Change MlMetadata to indicate that upgrade_mode is now enabled
clusterService.submitStateUpdateTask("ml-set-upgrade-mode",
new AckedClusterStateUpdateTask<AcknowledgedResponse>(request, clusterStateUpdateListener) {
@Override
protected AcknowledgedResponse newResponse(boolean acknowledged) {
return new AcknowledgedResponse(acknowledged);
}
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
MlMetadata.Builder builder = new MlMetadata.Builder(currentState.metaData().custom(MlMetadata.TYPE));
builder.isUpgradeMode(request.isEnabled());
ClusterState.Builder newState = ClusterState.builder(currentState);
newState.metaData(MetaData.builder(currentState.getMetaData()).putCustom(MlMetadata.TYPE, builder.build()).build());
return newState.build();
}
});
}
@Override
protected ClusterBlockException checkBlock(SetUpgradeModeAction.Request request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
}
/**
* Unassigns all Job and Datafeed tasks.
* <p>
* The reason for unassigning both types is that we want the Datafeed to attempt re-assignment once `upgrade_mode` is
* disabled.
* <p>
* If we do not force an allocation change for the Datafeed tasks, they will never start again, since they were isolated.
* <p>
* Datafeed tasks keep the state as `started` and Jobs stay `opened`
*
* @param tasksCustomMetaData Current state of persistent tasks
* @param listener Alerted when tasks are unassignd
*/
private void unassignPersistentTasks(PersistentTasksCustomMetaData tasksCustomMetaData,
ActionListener<List<PersistentTask<?>>> listener) {
List<PersistentTask<?>> datafeedAndJobTasks = tasksCustomMetaData
.tasks()
.stream()
.filter(persistentTask -> (persistentTask.getTaskName().equals(MlTasks.JOB_TASK_NAME) ||
persistentTask.getTaskName().equals(MlTasks.DATAFEED_TASK_NAME)))
.collect(Collectors.toList());
TypedChainTaskExecutor<PersistentTask<?>> chainTaskExecutor =
new TypedChainTaskExecutor<>(client.threadPool().executor(executor()), r -> true, ex -> true);
for (PersistentTask<?> task : datafeedAndJobTasks) {
chainTaskExecutor.add(
chainedTask -> persistentTasksClusterService.unassignPersistentTask(task.getId(),
task.getAllocationId(),
AWAITING_UPGRADE.getExplanation(),
chainedTask)
);
}
chainTaskExecutor.execute(listener);
}
private void isolateDatafeeds(PersistentTasksCustomMetaData tasksCustomMetaData,
ActionListener<List<IsolateDatafeedAction.Response>> listener) {
Set<String> datafeedsToIsolate = MlTasks.startedDatafeedIds(tasksCustomMetaData);
TypedChainTaskExecutor<IsolateDatafeedAction.Response> isolateDatafeedsExecutor =
new TypedChainTaskExecutor<>(client.threadPool().executor(executor()), r -> true, ex -> true);
datafeedsToIsolate.forEach(datafeedId -> {
IsolateDatafeedAction.Request isolationRequest = new IsolateDatafeedAction.Request(datafeedId);
isolateDatafeedsExecutor.add(isolateListener ->
executeAsyncWithOrigin(client, ML_ORIGIN, IsolateDatafeedAction.INSTANCE, isolationRequest, isolateListener)
);
});
isolateDatafeedsExecutor.execute(listener);
}
}

View File

@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ml.datafeed;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
@ -14,6 +15,8 @@ import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.license.RemoteClusterLicenseChecker;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
@ -22,6 +25,8 @@ import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import java.util.List;
import java.util.Objects;
import static org.elasticsearch.xpack.core.ml.MlTasks.AWAITING_UPGRADE;
public class DatafeedNodeSelector {
private static final Logger LOGGER = LogManager.getLogger(DatafeedNodeSelector.class);
@ -45,6 +50,13 @@ public class DatafeedNodeSelector {
}
public void checkDatafeedTaskCanBeCreated() {
if (MlMetadata.getMlMetadata(clusterState).isUpgradeMode()) {
String msg = "Unable to start datafeed [" + datafeedId +"] explanation [" + AWAITING_UPGRADE.getExplanation() + "]";
LOGGER.debug(msg);
Exception detail = new IllegalStateException(msg);
throw new ElasticsearchStatusException("Could not start datafeed [" + datafeedId +"] as indices are being upgraded",
RestStatus.TOO_MANY_REQUESTS, detail);
}
AssignmentFailure assignmentFailure = checkAssignment();
if (assignmentFailure != null && assignmentFailure.isCriticalForTaskCreation) {
String msg = "No node found to start datafeed [" + datafeedId + "], " +
@ -55,6 +67,10 @@ public class DatafeedNodeSelector {
}
public PersistentTasksCustomMetaData.Assignment selectNode() {
if (MlMetadata.getMlMetadata(clusterState).isUpgradeMode()) {
return AWAITING_UPGRADE;
}
AssignmentFailure assignmentFailure = checkAssignment();
if (assignmentFailure == null) {
return new PersistentTasksCustomMetaData.Assignment(jobTask.getExecutorNode(), "");

View File

@ -54,7 +54,6 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
public class AutodetectCommunicator implements Closeable {
@ -67,7 +66,7 @@ public class AutodetectCommunicator implements Closeable {
private final StateStreamer stateStreamer;
private final DataCountsReporter dataCountsReporter;
private final AutoDetectResultProcessor autoDetectResultProcessor;
private final Consumer<Exception> onFinishHandler;
private final BiConsumer<Exception, Boolean> onFinishHandler;
private final ExecutorService autodetectWorkerExecutor;
private final NamedXContentRegistry xContentRegistry;
private final boolean includeTokensField;
@ -76,7 +75,7 @@ public class AutodetectCommunicator implements Closeable {
AutodetectCommunicator(Job job, Environment environment, AutodetectProcess process, StateStreamer stateStreamer,
DataCountsReporter dataCountsReporter, AutoDetectResultProcessor autoDetectResultProcessor,
Consumer<Exception> onFinishHandler, NamedXContentRegistry xContentRegistry,
BiConsumer<Exception, Boolean> onFinishHandler, NamedXContentRegistry xContentRegistry,
ExecutorService autodetectWorkerExecutor) {
this.job = job;
this.environment = environment;
@ -160,7 +159,7 @@ public class AutodetectCommunicator implements Closeable {
}
autoDetectResultProcessor.awaitCompletion();
} finally {
onFinishHandler.accept(restart ? new ElasticsearchException(reason) : null);
onFinishHandler.accept(restart ? new ElasticsearchException(reason) : null, true);
}
LOGGER.info("[{}] job closed", job.getId());
return null;
@ -183,6 +182,10 @@ public class AutodetectCommunicator implements Closeable {
}
public void killProcess(boolean awaitCompletion, boolean finish) throws IOException {
killProcess(awaitCompletion, finish, true);
}
public void killProcess(boolean awaitCompletion, boolean finish, boolean finalizeJob) throws IOException {
try {
processKilled = true;
autoDetectResultProcessor.setProcessKilled();
@ -198,7 +201,7 @@ public class AutodetectCommunicator implements Closeable {
}
} finally {
if (finish) {
onFinishHandler.accept(null);
onFinishHandler.accept(null, finalizeJob);
}
destroyCategorizationAnalyzer();
}

View File

@ -11,7 +11,10 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.collect.Tuple;
@ -32,6 +35,7 @@ import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.action.GetFiltersAction;
import org.elasticsearch.xpack.core.ml.action.util.QueryPage;
import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent;
@ -93,7 +97,7 @@ import static org.elasticsearch.common.settings.Setting.Property;
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
public class AutodetectProcessManager {
public class AutodetectProcessManager implements ClusterStateListener {
// We should be able from the job config to estimate the memory/cpu a job needs to have,
// and if we know that then we can prior to assigning a job to a node fail based on the
@ -135,11 +139,13 @@ public class AutodetectProcessManager {
private final Auditor auditor;
private volatile boolean upgradeInProgress;
public AutodetectProcessManager(Environment environment, Settings settings, Client client, ThreadPool threadPool,
JobManager jobManager, JobResultsProvider jobResultsProvider, JobResultsPersister jobResultsPersister,
JobDataCountsPersister jobDataCountsPersister,
AutodetectProcessFactory autodetectProcessFactory, NormalizerFactory normalizerFactory,
NamedXContentRegistry xContentRegistry, Auditor auditor) {
NamedXContentRegistry xContentRegistry, Auditor auditor, ClusterService clusterService) {
this.environment = environment;
this.client = client;
this.threadPool = threadPool;
@ -153,6 +159,7 @@ public class AutodetectProcessManager {
this.jobDataCountsPersister = jobDataCountsPersister;
this.auditor = auditor;
this.nativeStorageProvider = new NativeStorageProvider(environment, MIN_DISK_SPACE_OFF_HEAP.get(settings));
clusterService.addListener(this);
}
public void onNodeStartup() {
@ -182,6 +189,7 @@ public class AutodetectProcessManager {
.setAwaitCompletion(awaitCompletion)
.setFinish(true)
.setReason(reason)
.setShouldFinalizeJob(upgradeInProgress == false)
.kill();
} else {
// If the process is missing but the task exists this is most likely
@ -415,7 +423,7 @@ public class AutodetectProcessManager {
}
}
public void openJob(JobTask jobTask, ClusterState clusterState, Consumer<Exception> closeHandler) {
public void openJob(JobTask jobTask, ClusterState clusterState, BiConsumer<Exception, Boolean> closeHandler) {
String jobId = jobTask.getJobId();
logger.info("Opening job [{}]", jobId);
@ -426,7 +434,7 @@ public class AutodetectProcessManager {
job -> {
if (job.getJobVersion() == null) {
closeHandler.accept(ExceptionsHelper.badRequestException("Cannot open job [" + jobId
+ "] because jobs created prior to version 5.5 are not supported"));
+ "] because jobs created prior to version 5.5 are not supported"), true);
return;
}
@ -436,7 +444,7 @@ public class AutodetectProcessManager {
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
closeHandler.accept(e);
closeHandler.accept(e, true);
}
@Override
@ -466,25 +474,25 @@ public class AutodetectProcessManager {
.kill();
processByAllocation.remove(jobTask.getAllocationId());
} finally {
setJobState(jobTask, JobState.FAILED, e2 -> closeHandler.accept(e1));
setJobState(jobTask, JobState.FAILED, e2 -> closeHandler.accept(e1, true));
}
}
}
});
}, e1 -> {
logger.warn("Failed to gather information required to open job [" + jobId + "]", e1);
setJobState(jobTask, JobState.FAILED, e2 -> closeHandler.accept(e1));
setJobState(jobTask, JobState.FAILED, e2 -> closeHandler.accept(e1, true));
});
},
closeHandler
e -> closeHandler.accept(e, true)
));
},
closeHandler);
e -> closeHandler.accept(e, true));
// Make sure the state index and alias exist
ActionListener<Boolean> resultsMappingUpdateHandler = ActionListener.wrap(
ack -> AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessary(client, clusterState, stateAliasHandler),
closeHandler
e -> closeHandler.accept(e, true)
);
// Try adding the results doc mapping - this updates to the latest version if an old mapping is present
@ -492,7 +500,10 @@ public class AutodetectProcessManager {
ElasticsearchMappings::resultsMapping, client, clusterState, resultsMappingUpdateHandler);
}
private void createProcessAndSetRunning(ProcessContext processContext, Job job, AutodetectParams params, Consumer<Exception> handler) {
private void createProcessAndSetRunning(ProcessContext processContext,
Job job,
AutodetectParams params,
BiConsumer<Exception, Boolean> handler) {
// At this point we lock the process context until the process has been started.
// The reason behind this is to ensure closing the job does not happen before
// the process is started as that can result to the job getting seemingly closed
@ -509,7 +520,7 @@ public class AutodetectProcessManager {
}
}
AutodetectCommunicator create(JobTask jobTask, Job job, AutodetectParams autodetectParams, Consumer<Exception> handler) {
AutodetectCommunicator create(JobTask jobTask, Job job, AutodetectParams autodetectParams, BiConsumer<Exception, Boolean> handler) {
// Closing jobs can still be using some or all threads in MachineLearning.AUTODETECT_THREAD_POOL_NAME
// that an open job uses, so include them too when considering if enough threads are available.
int currentRunningJobs = processByAllocation.size();
@ -771,6 +782,11 @@ public class AutodetectProcessManager {
return nativeStorageProvider.getMinLocalStorageAvailable();
}
@Override
public void clusterChanged(ClusterChangedEvent event) {
upgradeInProgress = MlMetadata.getMlMetadata(event.state()).isUpgradeMode();
}
/*
* The autodetect native process can only handle a single operation at a time. In order to guarantee that, all
* operations are initially added to a queue and a worker thread from ml autodetect threadpool will process each

View File

@ -86,6 +86,7 @@ final class ProcessContext {
private boolean awaitCompletion;
private boolean finish;
private boolean silent;
private boolean shouldFinalizeJob = true;
private String reason;
KillBuilder setAwaitCompletion(boolean awaitCompletion) {
@ -108,6 +109,11 @@ final class ProcessContext {
return this;
}
KillBuilder setShouldFinalizeJob(boolean shouldFinalizeJob) {
this.shouldFinalizeJob = shouldFinalizeJob;
return this;
}
void kill() {
if (autodetectCommunicator == null) {
return;
@ -123,7 +129,7 @@ final class ProcessContext {
}
}
try {
autodetectCommunicator.killProcess(awaitCompletion, finish);
autodetectCommunicator.killProcess(awaitCompletion, finish, shouldFinalizeJob);
} catch (IOException e) {
LOGGER.error("[{}] Failed to kill autodetect process for job", jobId);
}

View File

@ -0,0 +1,49 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.rest;
import org.apache.logging.log4j.LogManager;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.xpack.core.ml.action.SetUpgradeModeAction;
import org.elasticsearch.xpack.ml.MachineLearning;
import java.io.IOException;
import static org.elasticsearch.rest.RestRequest.Method.POST;
public class RestSetUpgradeModeAction extends BaseRestHandler {
private static final DeprecationLogger deprecationLogger =
new DeprecationLogger(LogManager.getLogger(RestSetUpgradeModeAction.class));
public RestSetUpgradeModeAction(Settings settings, RestController controller) {
super(settings);
// TODO: remove deprecated endpoint in 8.0.0
controller.registerWithDeprecatedHandler(
POST, MachineLearning.BASE_PATH + "set_upgrade_mode", this,
POST, MachineLearning.PRE_V7_BASE_PATH + "set_upgrade_mode", deprecationLogger);
}
@Override
public String getName() {
return "ml_set_upgrade_mode_action";
}
@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
SetUpgradeModeAction.Request request =
new SetUpgradeModeAction.Request(restRequest.paramAsBoolean("enabled", false));
request.timeout(restRequest.paramAsTime("timeout", request.timeout()));
request.masterNodeTimeout(restRequest.paramAsTime("master_timeout", request.masterNodeTimeout()));
return channel -> client.execute(SetUpgradeModeAction.INSTANCE, request, new RestToXContentListener<>(channel));
}
}

View File

@ -27,6 +27,7 @@ import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.job.config.Job;
@ -52,6 +53,7 @@ public class DatafeedNodeSelectorTests extends ESTestCase {
private DiscoveryNodes nodes;
private ClusterState clusterState;
private PersistentTasksCustomMetaData tasks;
private MlMetadata mlMetadata;
@Before
public void init() {
@ -60,6 +62,7 @@ public class DatafeedNodeSelectorTests extends ESTestCase {
.add(new DiscoveryNode("node_name", "node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9300),
Collections.emptyMap(), Collections.emptySet(), Version.CURRENT))
.build();
mlMetadata = new MlMetadata.Builder().build();
}
public void testSelectNode_GivenJobIsOpened() {
@ -283,6 +286,39 @@ public class DatafeedNodeSelectorTests extends ESTestCase {
+ "[cannot start datafeed [datafeed_id] because index [not_foo] does not exist, is closed, or is still initializing.]"));
}
public void testSelectNode_GivenMlUpgradeMode() {
Job job = createScheduledJob("job_id").build(new Date());
DatafeedConfig df = createDatafeed("datafeed_id", job.getId(), Collections.singletonList("foo"));
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
addJobTask(job.getId(), "node_id", JobState.OPENED, tasksBuilder);
tasks = tasksBuilder.build();
mlMetadata = new MlMetadata.Builder().isUpgradeMode(true).build();
givenClusterState("foo", 1, 0);
PersistentTasksCustomMetaData.Assignment result =
new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices()).selectNode();
assertThat(result, equalTo(MlTasks.AWAITING_UPGRADE));
}
public void testCheckDatafeedTaskCanBeCreated_GivenMlUpgradeMode() {
Job job = createScheduledJob("job_id").build(new Date());
DatafeedConfig df = createDatafeed("datafeed_id", job.getId(), Collections.singletonList("foo"));
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
addJobTask(job.getId(), "node_id", JobState.OPENED, tasksBuilder);
tasks = tasksBuilder.build();
mlMetadata = new MlMetadata.Builder().isUpgradeMode(true).build();
givenClusterState("foo", 1, 0);
ElasticsearchException e = expectThrows(ElasticsearchException.class,
() -> new DatafeedNodeSelector(clusterState, resolver, df.getId(), df.getJobId(), df.getIndices())
.checkDatafeedTaskCanBeCreated());
assertThat(e.getMessage(), equalTo("Could not start datafeed [datafeed_id] as indices are being upgraded"));
}
private void givenClusterState(String index, int numberOfShards, int numberOfReplicas) {
List<Tuple<Integer, ShardRoutingState>> states = new ArrayList<>(1);
states.add(new Tuple<>(0, ShardRoutingState.STARTED));
@ -299,6 +335,7 @@ public class DatafeedNodeSelectorTests extends ESTestCase {
clusterState = ClusterState.builder(new ClusterName("cluster_name"))
.metaData(new MetaData.Builder()
.putCustom(PersistentTasksCustomMetaData.TYPE, tasks)
.putCustom(MlMetadata.TYPE, mlMetadata)
.put(indexMetaData, false))
.nodes(nodes)
.routingTable(generateRoutingTable(indexMetaData, states))

View File

@ -48,7 +48,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.BiConsumer;
import static org.elasticsearch.mock.orig.Mockito.doAnswer;
import static org.hamcrest.Matchers.equalTo;
@ -193,7 +193,7 @@ public class AutodetectCommunicatorTests extends ESTestCase {
AtomicBoolean finishCalled = new AtomicBoolean(false);
AutodetectCommunicator communicator = createAutodetectCommunicator(executorService, process, resultProcessor,
e -> finishCalled.set(true));
(e, b) -> finishCalled.set(true));
boolean awaitCompletion = randomBoolean();
boolean finish = randomBoolean();
communicator.killProcess(awaitCompletion, finish);
@ -233,7 +233,7 @@ public class AutodetectCommunicatorTests extends ESTestCase {
@SuppressWarnings("unchecked")
private AutodetectCommunicator createAutodetectCommunicator(ExecutorService executorService, AutodetectProcess autodetectProcess,
AutoDetectResultProcessor autoDetectResultProcessor,
Consumer<Exception> finishHandler) throws IOException {
BiConsumer<Exception, Boolean> finishHandler) throws IOException {
DataCountsReporter dataCountsReporter = mock(DataCountsReporter.class);
doAnswer(invocation -> {
((ActionListener<Boolean>) invocation.getArguments()[0]).onResponse(true);
@ -259,7 +259,7 @@ public class AutodetectCommunicatorTests extends ESTestCase {
return null;
}).when(executorService).execute(any(Runnable.class));
return createAutodetectCommunicator(executorService, autodetectProcess, autoDetectResultProcessor, e -> {});
return createAutodetectCommunicator(executorService, autodetectProcess, autoDetectResultProcessor, (e, b) -> {});
}
}

View File

@ -11,6 +11,7 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.AliasOrIndex;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
@ -116,6 +117,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
private NormalizerFactory normalizerFactory;
private Auditor auditor;
private ClusterState clusterState;
private ClusterService clusterService;
private DataCounts dataCounts = new DataCounts("foo");
private ModelSizeStats modelSizeStats = new ModelSizeStats.Builder("foo").build();
@ -135,6 +137,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
jobDataCountsPersister = mock(JobDataCountsPersister.class);
normalizerFactory = mock(NormalizerFactory.class);
auditor = mock(Auditor.class);
clusterService = mock(ClusterService.class);
MetaData metaData = mock(MetaData.class);
SortedMap<String, AliasOrIndex> aliasOrIndexSortedMap = new TreeMap<>();
aliasOrIndexSortedMap.put(AnomalyDetectorsIndex.jobStateIndexWriteAlias(), mock(AliasOrIndex.Alias.class));
@ -184,7 +187,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
JobTask jobTask = mock(JobTask.class);
when(jobTask.getJobId()).thenReturn("foo");
when(jobTask.getAllocationId()).thenReturn(1L);
manager.openJob(jobTask, clusterState, e -> {});
manager.openJob(jobTask, clusterState, (e, b) -> {});
assertEquals(1, manager.numberOfOpenJobs());
assertTrue(manager.jobHasActiveAutodetectProcess(jobTask));
verify(jobTask).updatePersistentTaskState(eq(new JobTaskState(JobState.OPENED, 1L)), any());
@ -210,7 +213,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
JobTask jobTask = mock(JobTask.class);
when(jobTask.getJobId()).thenReturn(job.getId());
AtomicReference<Exception> errorHolder = new AtomicReference<>();
manager.openJob(jobTask, clusterState, errorHolder::set);
manager.openJob(jobTask, clusterState, (e, b) -> errorHolder.set(e));
Exception error = errorHolder.get();
assertThat(error, is(notNullValue()));
assertThat(error.getMessage(), equalTo("Cannot open job [no_version] because jobs created prior to version 5.5 are not supported"));
@ -245,7 +248,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
settings.put(AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE.getKey(), 3);
AutodetectProcessManager manager = spy(new AutodetectProcessManager(environment, settings.build(), client, threadPool,
jobManager, jobResultsProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory,
normalizerFactory, new NamedXContentRegistry(Collections.emptyList()), auditor));
normalizerFactory, new NamedXContentRegistry(Collections.emptyList()), auditor, clusterService));
doReturn(executorService).when(manager).createAutodetectExecutorService(any());
doAnswer(invocationOnMock -> {
@ -256,22 +259,22 @@ public class AutodetectProcessManagerTests extends ESTestCase {
JobTask jobTask = mock(JobTask.class);
when(jobTask.getJobId()).thenReturn("foo");
manager.openJob(jobTask, clusterState, e -> {});
manager.openJob(jobTask, clusterState, (e, b) -> {});
jobTask = mock(JobTask.class);
when(jobTask.getJobId()).thenReturn("bar");
when(jobTask.getAllocationId()).thenReturn(1L);
manager.openJob(jobTask, clusterState, e -> {});
manager.openJob(jobTask, clusterState, (e, b) -> {});
jobTask = mock(JobTask.class);
when(jobTask.getJobId()).thenReturn("baz");
when(jobTask.getAllocationId()).thenReturn(2L);
manager.openJob(jobTask, clusterState, e -> {});
manager.openJob(jobTask, clusterState, (e, b) -> {});
assertEquals(3, manager.numberOfOpenJobs());
Exception[] holder = new Exception[1];
jobTask = mock(JobTask.class);
when(jobTask.getJobId()).thenReturn("foobar");
when(jobTask.getAllocationId()).thenReturn(3L);
manager.openJob(jobTask, clusterState, e -> holder[0] = e);
manager.openJob(jobTask, clusterState, (e, b) -> holder[0] = e);
Exception e = holder[0];
assertEquals("max running job capacity [3] reached", e.getMessage());
@ -280,7 +283,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
when(jobTask.getJobId()).thenReturn("baz");
manager.closeJob(jobTask, false, null);
assertEquals(2, manager.numberOfOpenJobs());
manager.openJob(jobTask, clusterState, e1 -> {});
manager.openJob(jobTask, clusterState, (e1, b) -> {});
assertEquals(3, manager.numberOfOpenJobs());
}
@ -292,7 +295,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
JobTask jobTask = mock(JobTask.class);
when(jobTask.getJobId()).thenReturn("foo");
DataLoadParams params = new DataLoadParams(TimeRange.builder().build(), Optional.empty());
manager.openJob(jobTask, clusterState, e -> {});
manager.openJob(jobTask, clusterState, (e, b) -> {});
manager.processData(jobTask, analysisRegistry, createInputStream(""), randomFrom(XContentType.values()),
params, (dataCounts1, e) -> {});
assertEquals(1, manager.numberOfOpenJobs());
@ -315,7 +318,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
JobTask jobTask = mock(JobTask.class);
when(jobTask.getJobId()).thenReturn("foo");
manager.openJob(jobTask, clusterState, e -> {});
manager.openJob(jobTask, clusterState, (e, b) -> {});
Exception[] holder = new Exception[1];
manager.processData(jobTask, analysisRegistry, inputStream, xContentType, params, (dataCounts1, e) -> holder[0] = e);
assertNotNull(holder[0]);
@ -328,7 +331,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
JobTask jobTask = mock(JobTask.class);
when(jobTask.getJobId()).thenReturn("foo");
manager.openJob(jobTask, clusterState, e -> {});
manager.openJob(jobTask, clusterState, (e, b) -> {});
manager.processData(jobTask, analysisRegistry, createInputStream(""), randomFrom(XContentType.values()),
mock(DataLoadParams.class), (dataCounts1, e) -> {});
@ -356,7 +359,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
JobTask jobTask = mock(JobTask.class);
when(jobTask.getJobId()).thenReturn("foo");
manager.openJob(jobTask, clusterState, e -> {});
manager.openJob(jobTask, clusterState, (e, b) -> {});
manager.processData(jobTask, analysisRegistry, createInputStream(""), randomFrom(XContentType.values()),
mock(DataLoadParams.class), (dataCounts1, e) -> {});
@ -398,13 +401,13 @@ public class AutodetectProcessManagerTests extends ESTestCase {
doAnswer(invocationOnMock -> {
killLatch.countDown();
return null;
}).when(communicator).killProcess(anyBoolean(), anyBoolean());
}).when(communicator).killProcess(anyBoolean(), anyBoolean(), anyBoolean());
AutodetectProcessManager manager = createManager(communicator);
assertEquals(0, manager.numberOfOpenJobs());
JobTask jobTask = mock(JobTask.class);
when(jobTask.getJobId()).thenReturn("foo");
manager.openJob(jobTask, clusterState, e -> {});
manager.openJob(jobTask, clusterState, (e, b) -> {});
manager.processData(jobTask, analysisRegistry, createInputStream(""), randomFrom(XContentType.values()),
mock(DataLoadParams.class), (dataCounts1, e) -> {});
@ -433,7 +436,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
InputStream inputStream = createInputStream("");
JobTask jobTask = mock(JobTask.class);
when(jobTask.getJobId()).thenReturn("foo");
manager.openJob(jobTask, clusterState, e -> {});
manager.openJob(jobTask, clusterState, (e, b) -> {});
manager.processData(jobTask, analysisRegistry, inputStream, xContentType, params, (dataCounts1, e) -> {});
verify(communicator).writeToJob(same(inputStream), same(analysisRegistry), same(xContentType), same(params), any());
}
@ -445,7 +448,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
JobTask jobTask = mock(JobTask.class);
when(jobTask.getJobId()).thenReturn("foo");
InputStream inputStream = createInputStream("");
manager.openJob(jobTask, clusterState, e -> {});
manager.openJob(jobTask, clusterState, (e, b) -> {});
manager.processData(jobTask, analysisRegistry, inputStream, randomFrom(XContentType.values()),
mock(DataLoadParams.class), (dataCounts1, e) -> {});
@ -485,7 +488,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
// create a jobtask
JobTask jobTask = mock(JobTask.class);
when(jobTask.getJobId()).thenReturn("foo");
manager.openJob(jobTask, clusterState, e -> {});
manager.openJob(jobTask, clusterState, (e, b) -> {});
manager.processData(jobTask, analysisRegistry, createInputStream(""), randomFrom(XContentType.values()), mock(DataLoadParams.class),
(dataCounts1, e) -> {
});
@ -525,7 +528,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
when(jobTask.getJobId()).thenReturn("foo");
assertFalse(manager.jobHasActiveAutodetectProcess(jobTask));
manager.openJob(jobTask, clusterState, e -> {});
manager.openJob(jobTask, clusterState, (e, b) -> {});
manager.processData(jobTask, analysisRegistry, createInputStream(""), randomFrom(XContentType.values()),
mock(DataLoadParams.class), (dataCounts1, e) -> {});
@ -543,7 +546,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
when(jobTask.getJobId()).thenReturn("foo");
assertFalse(manager.jobHasActiveAutodetectProcess(jobTask));
manager.openJob(jobTask, clusterState, e -> {});
manager.openJob(jobTask, clusterState, (e, b) -> {});
manager.processData(jobTask, analysisRegistry, createInputStream(""), randomFrom(XContentType.values()),
mock(DataLoadParams.class), (dataCounts1, e) -> {});
@ -551,7 +554,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
manager.killAllProcessesOnThisNode();
verify(communicator).killProcess(false, false);
verify(communicator).killProcess(false, false, true);
}
public void testKillingAMissingJobFinishesTheTask() throws IOException {
@ -577,7 +580,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
JobTask jobTask = mock(JobTask.class);
when(jobTask.getJobId()).thenReturn("foo");
manager.openJob(jobTask, clusterState, e -> {});
manager.openJob(jobTask, clusterState, (e, b) -> {});
InputStream inputStream = createInputStream("");
DataCounts[] dataCounts = new DataCounts[1];
manager.processData(jobTask, analysisRegistry, inputStream,
@ -607,12 +610,12 @@ public class AutodetectProcessManagerTests extends ESTestCase {
(j, autodetectParams, e, onProcessCrash) -> autodetectProcess;
AutodetectProcessManager manager = new AutodetectProcessManager(environment, Settings.EMPTY,
client, threadPool, jobManager, jobResultsProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory,
normalizerFactory, new NamedXContentRegistry(Collections.emptyList()), auditor);
normalizerFactory, new NamedXContentRegistry(Collections.emptyList()), auditor, clusterService);
JobTask jobTask = mock(JobTask.class);
when(jobTask.getJobId()).thenReturn("my_id");
expectThrows(EsRejectedExecutionException.class,
() -> manager.create(jobTask, job, buildAutodetectParams(), e -> {}));
() -> manager.create(jobTask, job, buildAutodetectParams(), (e, b) -> {}));
verify(autodetectProcess, times(1)).close();
}
@ -622,7 +625,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
JobTask jobTask = mock(JobTask.class);
when(jobTask.getJobId()).thenReturn("foo");
manager.create(jobTask, createJobDetails("foo"), buildAutodetectParams(), e -> {});
manager.create(jobTask, createJobDetails("foo"), buildAutodetectParams(), (e, b) -> {});
String expectedNotification = "Loading model snapshot [N/A], job latest_record_timestamp [N/A]";
verify(auditor).info("foo", expectedNotification);
@ -638,7 +641,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
JobTask jobTask = mock(JobTask.class);
when(jobTask.getJobId()).thenReturn("foo");
manager.create(jobTask, createJobDetails("foo"), buildAutodetectParams(), e -> {});
manager.create(jobTask, createJobDetails("foo"), buildAutodetectParams(), (e, b) -> {});
String expectedNotification = "Loading model snapshot [snapshot-1] with " +
"latest_record_timestamp [1970-01-01T00:00:00.000Z], " +
@ -657,7 +660,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
JobTask jobTask = mock(JobTask.class);
when(jobTask.getJobId()).thenReturn("foo");
manager.create(jobTask, createJobDetails("foo"), buildAutodetectParams(), e -> {});
manager.create(jobTask, createJobDetails("foo"), buildAutodetectParams(), (e, b) -> {});
String expectedNotification = "Loading model snapshot [N/A], " +
"job latest_record_timestamp [1970-01-01T00:00:00.000Z]";
@ -706,7 +709,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
(j, autodetectParams, e, onProcessCrash) -> autodetectProcess;
return new AutodetectProcessManager(environment, Settings.EMPTY, client, threadPool, jobManager,
jobResultsProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory,
normalizerFactory, new NamedXContentRegistry(Collections.emptyList()), auditor);
normalizerFactory, new NamedXContentRegistry(Collections.emptyList()), auditor, clusterService);
}
private AutodetectParams buildAutodetectParams() {
@ -732,7 +735,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
AutodetectProcessManager manager = new AutodetectProcessManager(environment, Settings.EMPTY,
client, threadPool, jobManager, jobResultsProvider, jobResultsPersister, jobDataCountsPersister,
autodetectProcessFactory, normalizerFactory,
new NamedXContentRegistry(Collections.emptyList()), auditor);
new NamedXContentRegistry(Collections.emptyList()), auditor, clusterService);
manager = spy(manager);
doReturn(communicator).when(manager).create(any(), any(), eq(buildAutodetectParams()), any());
return manager;
@ -742,7 +745,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
AutodetectProcessManager manager = createManager(communicator);
JobTask jobTask = mock(JobTask.class);
when(jobTask.getJobId()).thenReturn(jobId);
manager.openJob(jobTask, clusterState, e -> {});
manager.openJob(jobTask, clusterState, (e, b) -> {});
manager.processData(jobTask, analysisRegistry, createInputStream(""), randomFrom(XContentType.values()),
mock(DataLoadParams.class), (dataCounts, e) -> {});
return manager;

View File

@ -0,0 +1,21 @@
{
"ml.set_upgrade_mode": {
"documentation": "TODO",
"methods": [ "POST" ],
"url": {
"path": "/_ml/set_upgrade_mode",
"paths": [ "/_ml/set_upgrade_mode" ],
"params": {
"enabled": {
"type": "boolean",
"description": "Whether to enable upgrade_mode ML setting or not. Defaults to false."
},
"timeout": {
"type": "time",
"description": "Controls the time to wait before action times out. Defaults to 30 seconds"
}
}
},
"body": null
}
}

View File

@ -15,6 +15,7 @@ teardown:
- match: { defaults.anomaly_detectors.model_snapshot_retention_days: 1 }
- match: { defaults.datafeeds.scroll_size: 1000 }
- match: { limits: {} }
- match: { upgrade_mode: false }
- do:
cluster.put_settings:
@ -29,6 +30,7 @@ teardown:
- match: { defaults.anomaly_detectors.model_snapshot_retention_days: 1 }
- match: { defaults.datafeeds.scroll_size: 1000 }
- match: { limits.max_model_memory_limit: "512mb" }
- match: { upgrade_mode: false }
- do:
cluster.put_settings:
@ -43,3 +45,4 @@ teardown:
- match: { defaults.anomaly_detectors.model_snapshot_retention_days: 1 }
- match: { defaults.datafeeds.scroll_size: 1000 }
- match: { limits.max_model_memory_limit: "6gb" }
- match: { upgrade_mode: false }

View File

@ -0,0 +1,212 @@
---
setup:
- skip:
features: headers
- do:
indices.create:
index: airline-data
body:
mappings:
response:
properties:
time:
type: date
airline:
type: keyword
airport:
type: text
responsetime:
type: float
- do:
headers:
Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser
ml.put_job:
job_id: set-upgrade-mode-job
body: >
{
"job_id":"set-upgrade-mode-job",
"analysis_config" : {
"bucket_span": "1h",
"detectors" :[{"function":"count"}]
},
"data_description" : {
"format":"xcontent",
"time_field":"time",
"time_format":"epoch"
}
}
- do:
headers:
Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser
ml.put_datafeed:
datafeed_id: set-upgrade-mode-job-datafeed
body: >
{
"job_id":"set-upgrade-mode-job",
"indexes":["airline-data"]
}
- do:
headers:
Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser
ml.open_job:
job_id: set-upgrade-mode-job
- do:
headers:
Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser
ml.start_datafeed:
datafeed_id: set-upgrade-mode-job-datafeed
---
teardown:
- skip:
features: headers
- do:
headers:
Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser
ml.set_upgrade_mode:
enabled: false
---
"Test setting upgrade_mode to false when it is already false":
- do:
ml.set_upgrade_mode:
enabled: false
- match: { acknowledged: true }
- do:
cat.tasks: {}
- match:
$body: |
/.+job.+/
- do:
cat.tasks: {}
- match:
$body: |
/.+datafeed.+/
- do:
ml.info: {}
- match: { upgrade_mode: false }
---
"Setting upgrade_mode to enabled":
- do:
ml.info: {}
- match: { upgrade_mode: false }
- do:
ml.set_upgrade_mode:
enabled: true
- match: { acknowledged: true }
- do:
ml.get_job_stats:
job_id: set-upgrade-mode-job
- match: { jobs.0.state: "opened" }
- match: { jobs.0.assignment_explanation: "persistent task cannot be assigned while upgrade mode is enabled." }
- do:
ml.get_datafeed_stats:
datafeed_id: set-upgrade-mode-job-datafeed
- match: { datafeeds.0.state: "started" }
- match: { datafeeds.0.assignment_explanation: "persistent task cannot be assigned while upgrade mode is enabled." }
- do:
tasks.list:
actions: "xpack/ml/job*,xpack/ml/datafeed*"
- match: { nodes: { } }
- do:
ml.info: {}
- match: { upgrade_mode: true }
---
"Setting upgrade mode to disabled from enabled":
- do:
ml.set_upgrade_mode:
enabled: true
- match: { acknowledged: true }
- do:
ml.get_job_stats:
job_id: set-upgrade-mode-job
- match: { jobs.0.state: "opened" }
- match: { jobs.0.assignment_explanation: "persistent task cannot be assigned while upgrade mode is enabled." }
- do:
ml.get_datafeed_stats:
datafeed_id: set-upgrade-mode-job-datafeed
- match: { datafeeds.0.state: "started" }
- match: { datafeeds.0.assignment_explanation: "persistent task cannot be assigned while upgrade mode is enabled." }
- do:
tasks.list:
actions: "xpack/ml/job*,xpack/ml/datafeed*"
- match: { nodes: { } }
- do:
ml.info: {}
- match: { upgrade_mode: true }
- do:
ml.set_upgrade_mode:
enabled: false
- match: { acknowledged: true }
- do:
ml.get_job_stats:
job_id: set-upgrade-mode-job
- match: { jobs.0.state: "opened" }
- match: { jobs.0.assignment_explanation: "" }
- do:
ml.get_datafeed_stats:
datafeed_id: set-upgrade-mode-job-datafeed
- match: { datafeeds.0.state: "started" }
- match: { datafeeds.0.assignment_explanation: "" }
- do:
cat.tasks: {}
- match:
$body: |
/.+job.+/
- do:
cat.tasks: {}
- match:
$body: |
/.+datafeed.+/
---
"Attempt to open job when upgrade_mode is enabled":
- do:
ml.set_upgrade_mode:
enabled: true
- match: { acknowledged: true }
- do:
ml.put_job:
job_id: failing-set-upgrade-mode-job
body: >
{
"job_id":"failing-set-upgrade-mode-job",
"analysis_config" : {
"bucket_span": "1h",
"detectors" :[{"function":"count"}]
},
"data_description" : {
"format":"xcontent",
"time_field":"time",
"time_format":"epoch"
}
}
- do:
catch: /Cannot open jobs when upgrade mode is enabled/
ml.open_job:
job_id: failing-set-upgrade-mode-job