[7.x] Do not execute ML CRUD actions when upgrade mode is enabled (#54437) (#55049)

This commit is contained in:
Przemysław Witek 2020-04-10 16:07:11 +02:00 committed by GitHub
parent c440754784
commit 17101d86d9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 379 additions and 51 deletions

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.ml.integration;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterModule;
import org.elasticsearch.cluster.ClusterState;
@ -34,6 +35,7 @@ import org.elasticsearch.xpack.core.ml.action.DeleteExpiredDataAction;
import org.elasticsearch.xpack.core.ml.action.GetFiltersAction;
import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
import org.elasticsearch.xpack.core.ml.action.PutFilterAction;
import org.elasticsearch.xpack.core.ml.action.SetUpgradeModeAction;
import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
@ -56,6 +58,7 @@ import java.util.Map;
import static org.elasticsearch.test.XContentTestUtils.convertToMap;
import static org.elasticsearch.test.XContentTestUtils.differenceBetweenMapsIgnoringArrayOrder;
import static org.hamcrest.Matchers.is;
/**
* Base class of ML integration tests that use a native autodetect process
@ -111,6 +114,7 @@ abstract class MlNativeIntegTestCase extends ESIntegTestCase {
}
protected void cleanUp() {
setUpgradeModeTo(false);
cleanUpResources();
waitForPendingTasks();
}
@ -129,6 +133,19 @@ abstract class MlNativeIntegTestCase extends ESIntegTestCase {
}
}
protected void setUpgradeModeTo(boolean enabled) {
AcknowledgedResponse response =
client().execute(SetUpgradeModeAction.INSTANCE, new SetUpgradeModeAction.Request(enabled)).actionGet();
assertThat(response.isAcknowledged(), is(true));
assertThat(upgradeMode(), is(enabled));
}
protected boolean upgradeMode() {
ClusterState masterClusterState = client().admin().cluster().prepareState().all().get().getState();
MlMetadata mlMetadata = MlMetadata.getMlMetadata(masterClusterState);
return mlMetadata.isUpgradeMode();
}
protected DeleteExpiredDataAction.Response deleteExpiredData() throws Exception {
DeleteExpiredDataAction.Response response = client().execute(DeleteExpiredDataAction.INSTANCE,
new DeleteExpiredDataAction.Request()).get();

View File

@ -6,19 +6,19 @@
package org.elasticsearch.xpack.ml.integration;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction;
import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction;
import org.elasticsearch.xpack.core.ml.action.SetUpgradeModeAction;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedUpdate;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.config.JobUpdate;
import org.elasticsearch.xpack.core.ml.job.config.MlFilter;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
import org.junit.After;
@ -31,8 +31,10 @@ import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createSched
import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.getDataCounts;
import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.getDatafeedStats;
import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.indexDocs;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
@ -49,35 +51,31 @@ public class SetUpgradeModeIT extends MlNativeAutodetectIntegTestCase {
String datafeedId = jobId + "-datafeed";
startRealtime(jobId);
assertThat(upgradeMode(), is(false));
// Assert appropriate task state and assignment numbers
assertThat(client().admin()
.cluster()
.prepareListTasks()
.setActions(MlTasks.JOB_TASK_NAME + "[c]", MlTasks.DATAFEED_TASK_NAME + "[c]")
.get()
.getTasks()
.size(), equalTo(2));
.getTasks(), hasSize(2));
ClusterState masterClusterState = client().admin().cluster().prepareState().all().get().getState();
PersistentTasksCustomMetadata persistentTasks = masterClusterState.getMetadata().custom(PersistentTasksCustomMetadata.TYPE);
assertThat(persistentTasks.findTasks(MlTasks.DATAFEED_TASK_NAME, task -> true).size(), equalTo(1));
assertThat(persistentTasks.findTasks(MlTasks.JOB_TASK_NAME, task -> true).size(), equalTo(1));
assertThat(MlMetadata.getMlMetadata(masterClusterState).isUpgradeMode(), equalTo(false));
assertThat(persistentTasks.findTasks(MlTasks.DATAFEED_TASK_NAME, task -> true), hasSize(1));
assertThat(persistentTasks.findTasks(MlTasks.JOB_TASK_NAME, task -> true), hasSize(1));
// Set the upgrade mode setting
AcknowledgedResponse response = client().execute(SetUpgradeModeAction.INSTANCE, new SetUpgradeModeAction.Request(true))
.actionGet();
assertThat(response.isAcknowledged(), equalTo(true));
setUpgradeModeTo(true);
masterClusterState = client().admin().cluster().prepareState().all().get().getState();
// Assert state for tasks still exists and that the upgrade setting is set
persistentTasks = masterClusterState.getMetadata().custom(PersistentTasksCustomMetadata.TYPE);
assertThat(persistentTasks.findTasks(MlTasks.DATAFEED_TASK_NAME, task -> true).size(), equalTo(1));
assertThat(persistentTasks.findTasks(MlTasks.JOB_TASK_NAME, task -> true).size(), equalTo(1));
assertThat(MlMetadata.getMlMetadata(masterClusterState).isUpgradeMode(), equalTo(true));
assertThat(persistentTasks.findTasks(MlTasks.DATAFEED_TASK_NAME, task -> true), hasSize(1));
assertThat(persistentTasks.findTasks(MlTasks.JOB_TASK_NAME, task -> true), hasSize(1));
assertThat(client().admin()
.cluster()
@ -87,50 +85,81 @@ public class SetUpgradeModeIT extends MlNativeAutodetectIntegTestCase {
.getTasks(), is(empty()));
GetJobsStatsAction.Response.JobStats jobStats = getJobStats(jobId).get(0);
assertThat(jobStats.getState(), equalTo(JobState.OPENED));
assertThat(jobStats.getAssignmentExplanation(), equalTo(AWAITING_UPGRADE.getExplanation()));
assertThat(jobStats.getState(), is(equalTo(JobState.OPENED)));
assertThat(jobStats.getAssignmentExplanation(), is(equalTo(AWAITING_UPGRADE.getExplanation())));
assertThat(jobStats.getNode(), is(nullValue()));
GetDatafeedsStatsAction.Response.DatafeedStats datafeedStats = getDatafeedStats(datafeedId);
assertThat(datafeedStats.getDatafeedState(), equalTo(DatafeedState.STARTED));
assertThat(datafeedStats.getAssignmentExplanation(), equalTo(AWAITING_UPGRADE.getExplanation()));
assertThat(datafeedStats.getDatafeedState(), is(equalTo(DatafeedState.STARTED)));
assertThat(datafeedStats.getAssignmentExplanation(), is(equalTo(AWAITING_UPGRADE.getExplanation())));
assertThat(datafeedStats.getNode(), is(nullValue()));
Job.Builder job = createScheduledJob("job-should-not-open");
registerJob(job);
putJob(job);
ElasticsearchStatusException statusException = expectThrows(ElasticsearchStatusException.class, () -> openJob(job.getId()));
assertThat(statusException.status(), equalTo(RestStatus.TOO_MANY_REQUESTS));
assertThat(statusException.getMessage(), equalTo("Cannot open jobs when upgrade mode is enabled"));
//Disable the setting
response = client().execute(SetUpgradeModeAction.INSTANCE, new SetUpgradeModeAction.Request(false))
.actionGet();
assertThat(response.isAcknowledged(), equalTo(true));
// Disable the setting
setUpgradeModeTo(false);
masterClusterState = client().admin().cluster().prepareState().all().get().getState();
persistentTasks = masterClusterState.getMetadata().custom(PersistentTasksCustomMetadata.TYPE);
assertThat(persistentTasks.findTasks(MlTasks.DATAFEED_TASK_NAME, task -> true).size(), equalTo(1));
assertThat(persistentTasks.findTasks(MlTasks.JOB_TASK_NAME, task -> true).size(), equalTo(1));
assertThat(MlMetadata.getMlMetadata(masterClusterState).isUpgradeMode(), equalTo(false));
assertThat(persistentTasks.findTasks(MlTasks.DATAFEED_TASK_NAME, task -> true), hasSize(1));
assertThat(persistentTasks.findTasks(MlTasks.JOB_TASK_NAME, task -> true), hasSize(1));
assertBusy(() -> assertThat(client().admin()
.cluster()
.prepareListTasks()
.setActions(MlTasks.JOB_TASK_NAME + "[c]", MlTasks.DATAFEED_TASK_NAME + "[c]")
.get()
.getTasks()
.size(), equalTo(2)));
.getTasks(), hasSize(2)));
jobStats = getJobStats(jobId).get(0);
assertThat(jobStats.getState(), equalTo(JobState.OPENED));
assertThat(jobStats.getAssignmentExplanation(), not(equalTo(AWAITING_UPGRADE.getExplanation())));
assertThat(jobStats.getState(), is(equalTo(JobState.OPENED)));
assertThat(jobStats.getAssignmentExplanation(), is(not(equalTo(AWAITING_UPGRADE.getExplanation()))));
datafeedStats = getDatafeedStats(datafeedId);
assertThat(datafeedStats.getDatafeedState(), equalTo(DatafeedState.STARTED));
assertThat(datafeedStats.getAssignmentExplanation(), not(equalTo(AWAITING_UPGRADE.getExplanation())));
assertThat(datafeedStats.getDatafeedState(), is(equalTo(DatafeedState.STARTED)));
assertThat(datafeedStats.getAssignmentExplanation(), is(not(equalTo(AWAITING_UPGRADE.getExplanation()))));
}
public void testJobOpenActionInUpgradeMode() {
String jobId = "job-should-not-open";
Job.Builder job = createScheduledJob(jobId);
registerJob(job);
putJob(job);
setUpgradeModeTo(true);
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> openJob(jobId));
assertThat(e.getMessage(), is(equalTo("Cannot perform cluster:admin/xpack/ml/job/open action while upgrade mode is enabled")));
assertThat(e.status(), is(equalTo(RestStatus.TOO_MANY_REQUESTS)));
}
public void testAnomalyDetectionActionsInUpgradeMode() {
setUpgradeModeTo(true);
String jobId = "job_id";
expectThrowsUpgradeModeException(() -> putJob(createScheduledJob(jobId)));
expectThrowsUpgradeModeException(() -> updateJob(jobId, new JobUpdate.Builder(jobId).build()));
expectThrowsUpgradeModeException(() -> deleteJob(jobId));
expectThrowsUpgradeModeException(() -> openJob(jobId));
expectThrowsUpgradeModeException(() -> flushJob(jobId, false));
expectThrowsUpgradeModeException(() -> closeJob(jobId));
expectThrowsUpgradeModeException(() -> persistJob(jobId));
expectThrowsUpgradeModeException(() -> forecast(jobId, null, null));
String snapshotId = "snapshot_id";
expectThrowsUpgradeModeException(() -> revertModelSnapshot(jobId, snapshotId));
String datafeedId = "datafeed_id";
expectThrowsUpgradeModeException(() -> putDatafeed(createDatafeed(datafeedId, jobId, Collections.singletonList("index"))));
expectThrowsUpgradeModeException(() -> updateDatafeed(new DatafeedUpdate.Builder(datafeedId).build()));
expectThrowsUpgradeModeException(() -> deleteDatafeed(datafeedId));
expectThrowsUpgradeModeException(() -> startDatafeed(datafeedId, 0, null));
expectThrowsUpgradeModeException(() -> stopDatafeed(datafeedId));
String filterId = "filter_id";
expectThrowsUpgradeModeException(() -> putMlFilter(MlFilter.builder(filterId).build()));
String calendarId = "calendar_id";
expectThrowsUpgradeModeException(() -> putCalendar(calendarId, Collections.singletonList(jobId), ""));
}
private void startRealtime(String jobId) throws Exception {
@ -154,8 +183,8 @@ public class SetUpgradeModeIT extends MlNativeAutodetectIntegTestCase {
startDatafeed(datafeedConfig.getId(), 0L, null);
assertBusy(() -> {
DataCounts dataCounts = getDataCounts(job.getId());
assertThat(dataCounts.getProcessedRecordCount(), equalTo(numDocs1));
assertThat(dataCounts.getOutOfOrderTimeStampCount(), equalTo(0L));
assertThat(dataCounts.getProcessedRecordCount(), is(equalTo(numDocs1)));
assertThat(dataCounts.getOutOfOrderTimeStampCount(), is(equalTo(0L)));
});
long numDocs2 = randomIntBetween(2, 64);
@ -163,9 +192,14 @@ public class SetUpgradeModeIT extends MlNativeAutodetectIntegTestCase {
indexDocs(logger, "data", numDocs2, now + 5000, now + 6000);
assertBusy(() -> {
DataCounts dataCounts = getDataCounts(job.getId());
assertThat(dataCounts.getProcessedRecordCount(), equalTo(numDocs1 + numDocs2));
assertThat(dataCounts.getOutOfOrderTimeStampCount(), equalTo(0L));
assertThat(dataCounts.getProcessedRecordCount(), is(equalTo(numDocs1 + numDocs2)));
assertThat(dataCounts.getOutOfOrderTimeStampCount(), is(equalTo(0L)));
}, 30, TimeUnit.SECONDS);
}
private static void expectThrowsUpgradeModeException(ThrowingRunnable actionInvocation) {
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, actionInvocation);
assertThat(e.getMessage(), containsString("upgrade mode is enabled"));
assertThat(e.status(), is(equalTo(RestStatus.TOO_MANY_REQUESTS)));
}
}

View File

@ -11,6 +11,7 @@ import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.ActionFilter;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.client.node.NodeClient;
@ -318,6 +319,7 @@ import java.util.function.Supplier;
import java.util.function.UnaryOperator;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
public class MachineLearning extends Plugin implements SystemIndexPlugin, AnalysisPlugin, IngestPlugin, PersistentTaskPlugin {
public static final String NAME = "ml";
@ -415,6 +417,7 @@ public class MachineLearning extends Plugin implements SystemIndexPlugin, Analys
private final SetOnce<DataFrameAnalyticsManager> dataFrameAnalyticsManager = new SetOnce<>();
private final SetOnce<DataFrameAnalyticsAuditor> dataFrameAnalyticsAuditor = new SetOnce<>();
private final SetOnce<MlMemoryTracker> memoryTracker = new SetOnce<>();
private final SetOnce<ActionFilter> mlUpgradeModeActionFilter = new SetOnce<>();
public MachineLearning(Settings settings, Path configPath) {
this.settings = settings;
@ -517,9 +520,11 @@ public class MachineLearning extends Plugin implements SystemIndexPlugin, Analys
IndexNameExpressionResolver indexNameExpressionResolver) {
if (enabled == false || transportClientMode) {
// special holder for @link(MachineLearningFeatureSetUsage) which needs access to job manager, empty if ML is disabled
return Collections.singletonList(new JobManagerHolder());
return singletonList(new JobManagerHolder());
}
this.mlUpgradeModeActionFilter.set(new MlUpgradeModeActionFilter(clusterService));
new MlIndexTemplateRegistry(settings, clusterService, threadPool, client, xContentRegistry);
AnomalyDetectionAuditor anomalyDetectionAuditor = new AnomalyDetectionAuditor(client, clusterService.getNodeName());
@ -870,6 +875,15 @@ public class MachineLearning extends Plugin implements SystemIndexPlugin, Analys
);
}
@Override
public List<ActionFilter> getActionFilters() {
if (enabled == false) {
return emptyList();
}
return singletonList(this.mlUpgradeModeActionFilter.get());
}
@Override
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
if (false == enabled || transportClientMode) {

View File

@ -0,0 +1,148 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.support.ActionFilter;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.action.CloseJobAction;
import org.elasticsearch.xpack.core.ml.action.DeleteCalendarAction;
import org.elasticsearch.xpack.core.ml.action.DeleteCalendarEventAction;
import org.elasticsearch.xpack.core.ml.action.DeleteDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.action.DeleteDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.DeleteExpiredDataAction;
import org.elasticsearch.xpack.core.ml.action.DeleteFilterAction;
import org.elasticsearch.xpack.core.ml.action.DeleteForecastAction;
import org.elasticsearch.xpack.core.ml.action.DeleteJobAction;
import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction;
import org.elasticsearch.xpack.core.ml.action.DeleteTrainedModelAction;
import org.elasticsearch.xpack.core.ml.action.FinalizeJobExecutionAction;
import org.elasticsearch.xpack.core.ml.action.FlushJobAction;
import org.elasticsearch.xpack.core.ml.action.ForecastJobAction;
import org.elasticsearch.xpack.core.ml.action.KillProcessAction;
import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
import org.elasticsearch.xpack.core.ml.action.PersistJobAction;
import org.elasticsearch.xpack.core.ml.action.PostCalendarEventsAction;
import org.elasticsearch.xpack.core.ml.action.PostDataAction;
import org.elasticsearch.xpack.core.ml.action.PutCalendarAction;
import org.elasticsearch.xpack.core.ml.action.PutDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.action.PutDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.PutFilterAction;
import org.elasticsearch.xpack.core.ml.action.PutJobAction;
import org.elasticsearch.xpack.core.ml.action.PutTrainedModelAction;
import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction;
import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.StopDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.UpdateCalendarJobAction;
import org.elasticsearch.xpack.core.ml.action.UpdateDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.UpdateFilterAction;
import org.elasticsearch.xpack.core.ml.action.UpdateJobAction;
import org.elasticsearch.xpack.core.ml.action.UpdateModelSnapshotAction;
import org.elasticsearch.xpack.core.ml.action.UpdateProcessAction;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* {@link MlUpgradeModeActionFilter} disallows certain actions if the cluster is currently in upgrade mode.
*
* Disallowed actions are the ones which can access/alter the state of ML internal indices.
*/
class MlUpgradeModeActionFilter extends ActionFilter.Simple {
private static final Set<String> ACTIONS_DISALLOWED_IN_UPGRADE_MODE =
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
PutJobAction.NAME,
UpdateJobAction.NAME,
DeleteJobAction.NAME,
OpenJobAction.NAME,
FlushJobAction.NAME,
CloseJobAction.NAME,
PersistJobAction.NAME,
FinalizeJobExecutionAction.NAME,
PostDataAction.NAME,
RevertModelSnapshotAction.NAME,
UpdateModelSnapshotAction.NAME,
DeleteModelSnapshotAction.NAME,
PutDatafeedAction.NAME,
UpdateDatafeedAction.NAME,
DeleteDatafeedAction.NAME,
StartDatafeedAction.NAME,
StopDatafeedAction.NAME,
PutFilterAction.NAME,
UpdateFilterAction.NAME,
DeleteFilterAction.NAME,
PutCalendarAction.NAME,
UpdateCalendarJobAction.NAME,
PostCalendarEventsAction.NAME,
DeleteCalendarAction.NAME,
DeleteCalendarEventAction.NAME,
UpdateProcessAction.NAME,
KillProcessAction.NAME,
DeleteExpiredDataAction.NAME,
ForecastJobAction.NAME,
DeleteForecastAction.NAME,
PutDataFrameAnalyticsAction.NAME,
DeleteDataFrameAnalyticsAction.NAME,
StartDataFrameAnalyticsAction.NAME,
StopDataFrameAnalyticsAction.NAME,
PutTrainedModelAction.NAME,
DeleteTrainedModelAction.NAME
)));
private final AtomicBoolean isUpgradeMode = new AtomicBoolean();
MlUpgradeModeActionFilter(ClusterService clusterService) {
Objects.requireNonNull(clusterService);
clusterService.addListener(this::setIsUpgradeMode);
}
@Override
protected boolean apply(String action, ActionRequest request, ActionListener<?> listener) {
if (isUpgradeMode.get() && ACTIONS_DISALLOWED_IN_UPGRADE_MODE.contains(action)) {
throw new ElasticsearchStatusException(
"Cannot perform {} action while upgrade mode is enabled", RestStatus.TOO_MANY_REQUESTS, action);
}
return true;
}
/**
* To prevent leaking information to unauthorized users, it is extremely important that this filter is executed *after* the
* {@code SecurityActionFilter}.
* To achieve that, the number returned by this method must be greater than the number returned by the
* {@code SecurityActionFilter::order} method.
*/
@Override
public int order() {
return Integer.MAX_VALUE;
}
// Visible for testing
void setIsUpgradeMode(ClusterChangedEvent event) {
isUpgradeMode.set(MlMetadata.getMlMetadata(event.state()).isUpgradeMode());
}
}

View File

@ -0,0 +1,115 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.ActionFilter;
import org.elasticsearch.action.support.ActionFilterChain;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.action.PutJobAction;
import org.elasticsearch.xpack.core.ml.action.SetUpgradeModeAction;
import org.elasticsearch.xpack.security.action.filter.SecurityActionFilter;
import org.junit.After;
import org.junit.Before;
import static org.hamcrest.Matchers.arrayContaining;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
public class MlUpgradeModeActionFilterTests extends ESTestCase {
private static final String DISALLOWED_ACTION = PutJobAction.NAME;
private static final String ALLOWED_ACTION = SetUpgradeModeAction.NAME;
private ClusterService clusterService;
private Task task;
private ActionRequest request;
private ActionListener<ActionResponse> listener;
private ActionFilterChain<ActionRequest, ActionResponse> chain;
@Before
@SuppressWarnings("unchecked")
public void setUpMocks() {
clusterService = mock(ClusterService.class);
task = mock(Task.class);
request = mock(ActionRequest.class);
listener = mock(ActionListener.class);
chain = mock(ActionFilterChain.class);
}
@After
public void assertNoMoreInteractions() {
verifyNoMoreInteractions(task, request, listener, chain);
}
public void testApply_ActionDisallowedInUpgradeMode() {
MlUpgradeModeActionFilter filter = new MlUpgradeModeActionFilter(clusterService);
filter.apply(task, DISALLOWED_ACTION, request, listener, chain);
filter.setIsUpgradeMode(createClusterChangedEvent(createClusterState(true)));
ElasticsearchStatusException e =
expectThrows(
ElasticsearchStatusException.class,
() -> filter.apply(task, DISALLOWED_ACTION, request, listener, chain));
filter.setIsUpgradeMode(createClusterChangedEvent(createClusterState(false)));
filter.apply(task, DISALLOWED_ACTION, request, listener, chain);
assertThat(e.getMessage(), is(equalTo("Cannot perform " + DISALLOWED_ACTION + " action while upgrade mode is enabled")));
assertThat(e.status(), is(equalTo(RestStatus.TOO_MANY_REQUESTS)));
verify(chain, times(2)).proceed(task, DISALLOWED_ACTION, request, listener);
}
public void testApply_ActionAllowedInUpgradeMode() {
MlUpgradeModeActionFilter filter = new MlUpgradeModeActionFilter(clusterService);
filter.apply(task, ALLOWED_ACTION, request, listener, chain);
filter.setIsUpgradeMode(createClusterChangedEvent(createClusterState(true)));
filter.apply(task, ALLOWED_ACTION, request, listener, chain);
filter.setIsUpgradeMode(createClusterChangedEvent(createClusterState(false)));
filter.apply(task, ALLOWED_ACTION, request, listener, chain);
verify(chain, times(3)).proceed(task, ALLOWED_ACTION, request, listener);
}
public void testOrder_UpgradeFilterIsExecutedAfterSecurityFilter() {
MlUpgradeModeActionFilter upgradeModeFilter = new MlUpgradeModeActionFilter(clusterService);
SecurityActionFilter securityFilter = new SecurityActionFilter(null, null, null, mock(ThreadPool.class), null, null);
ActionFilter[] actionFiltersInOrderOfExecution = new ActionFilters(Sets.newHashSet(upgradeModeFilter, securityFilter)).filters();
assertThat(actionFiltersInOrderOfExecution, is(arrayContaining(securityFilter, upgradeModeFilter)));
}
private static ClusterChangedEvent createClusterChangedEvent(ClusterState clusterState) {
return new ClusterChangedEvent("created-from-test", clusterState, clusterState);
}
private static ClusterState createClusterState(boolean isUpgradeMode) {
return ClusterState.builder(new ClusterName("MlUpgradeModeActionFilterTests"))
.metadata(Metadata.builder().putCustom(MlMetadata.TYPE, new MlMetadata.Builder().isUpgradeMode(isUpgradeMode).build()))
.build();
}
}

View File

@ -220,11 +220,6 @@ teardown:
---
"Attempt to open job when upgrade_mode is enabled":
- do:
ml.set_upgrade_mode:
enabled: true
- match: { acknowledged: true }
- do:
ml.put_job:
job_id: failing-set-upgrade-mode-job
@ -243,6 +238,11 @@ teardown:
}
- do:
catch: /Cannot open jobs when upgrade mode is enabled/
ml.set_upgrade_mode:
enabled: true
- match: { acknowledged: true }
- do:
catch: /Cannot perform cluster:admin/xpack/ml/job/open action while upgrade mode is enabled/
ml.open_job:
job_id: failing-set-upgrade-mode-job