[7.x] Unassign DFA tasks in SetUpgradeModeAction (#54523) (#55143)

This commit is contained in:
Przemysław Witek 2020-04-14 14:09:02 +02:00 committed by GitHub
parent 8a669dc9b7
commit d5bb574e1e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 100 additions and 28 deletions

View File

@ -40,8 +40,10 @@ import java.util.Map;
import java.util.Set; import java.util.Set;
import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toList;
import static org.elasticsearch.xpack.core.ml.MlTasks.AWAITING_UPGRADE;
import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo;
@ -51,6 +53,8 @@ import static org.hamcrest.Matchers.in;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.startsWith; import static org.hamcrest.Matchers.startsWith;
public class ClassificationIT extends MlNativeDataFrameAnalyticsIntegTestCase { public class ClassificationIT extends MlNativeDataFrameAnalyticsIntegTestCase {
@ -498,6 +502,59 @@ public class ClassificationIT extends MlNativeDataFrameAnalyticsIntegTestCase {
assertThat(secondRunTrainingRowsIds, equalTo(firstRunTrainingRowsIds)); assertThat(secondRunTrainingRowsIds, equalTo(firstRunTrainingRowsIds));
} }
public void testSetUpgradeMode_ExistingTaskGetsUnassigned() throws Exception {
initialize("classification_set_upgrade_mode");
indexData(sourceIndex, 300, 0, KEYWORD_FIELD);
assertThat(upgradeMode(), is(false));
DataFrameAnalyticsConfig config = buildAnalytics(jobId, sourceIndex, destIndex, null, new Classification(KEYWORD_FIELD));
registerAnalytics(config);
putAnalytics(config);
startAnalytics(jobId);
assertThat(analyticsTaskList(), hasSize(1));
assertThat(analyticsAssignedTaskList(), hasSize(1));
setUpgradeModeTo(true);
assertThat(analyticsTaskList(), hasSize(1));
assertThat(analyticsAssignedTaskList(), is(empty()));
GetDataFrameAnalyticsStatsAction.Response.Stats analyticsStats = getAnalyticsStats(jobId);
assertThat(analyticsStats.getAssignmentExplanation(), is(equalTo(AWAITING_UPGRADE.getExplanation())));
assertThat(analyticsStats.getNode(), is(nullValue()));
setUpgradeModeTo(false);
assertThat(analyticsTaskList(), hasSize(1));
assertBusy(() -> assertThat(analyticsAssignedTaskList(), hasSize(1)));
analyticsStats = getAnalyticsStats(jobId);
assertThat(analyticsStats.getAssignmentExplanation(), is(not(equalTo(AWAITING_UPGRADE.getExplanation()))));
waitUntilAnalyticsIsStopped(jobId);
}
public void testSetUpgradeMode_NewTaskDoesNotStart() throws Exception {
initialize("classification_set_upgrade_mode_task_should_not_start");
indexData(sourceIndex, 100, 0, KEYWORD_FIELD);
assertThat(upgradeMode(), is(false));
DataFrameAnalyticsConfig config = buildAnalytics(jobId, sourceIndex, destIndex, null, new Classification(KEYWORD_FIELD));
registerAnalytics(config);
putAnalytics(config);
setUpgradeModeTo(true);
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> startAnalytics(config.getId()));
assertThat(e.status(), is(equalTo(RestStatus.TOO_MANY_REQUESTS)));
assertThat(
e.getMessage(),
is(equalTo("Cannot perform cluster:admin/xpack/ml/data_frame/analytics/start action while upgrade mode is enabled")));
assertThat(analyticsTaskList(), is(empty()));
assertThat(analyticsAssignedTaskList(), is(empty()));
}
public void testDeleteExpiredData_RemovesUnusedState() throws Exception { public void testDeleteExpiredData_RemovesUnusedState() throws Exception {
initialize("classification_delete_expired_data"); initialize("classification_delete_expired_data");
indexData(sourceIndex, 100, 0, KEYWORD_FIELD); indexData(sourceIndex, 100, 0, KEYWORD_FIELD);

View File

@ -15,13 +15,17 @@ import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.DeleteDataFrameAnalyticsAction; import org.elasticsearch.xpack.core.ml.action.DeleteDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.action.EvaluateDataFrameAction; import org.elasticsearch.xpack.core.ml.action.EvaluateDataFrameAction;
import org.elasticsearch.xpack.core.ml.action.ExplainDataFrameAnalyticsAction; import org.elasticsearch.xpack.core.ml.action.ExplainDataFrameAnalyticsAction;
@ -48,6 +52,8 @@ import org.hamcrest.Matchers;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -60,6 +66,7 @@ import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.arrayWithSize; import static org.hamcrest.Matchers.arrayWithSize;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItems; import static org.hamcrest.Matchers.hasItems;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.nullValue;
@ -152,7 +159,7 @@ abstract class MlNativeDataFrameAnalyticsIntegTestCase extends MlNativeIntegTest
GetDataFrameAnalyticsStatsAction.Response response = client().execute(GetDataFrameAnalyticsStatsAction.INSTANCE, request) GetDataFrameAnalyticsStatsAction.Response response = client().execute(GetDataFrameAnalyticsStatsAction.INSTANCE, request)
.actionGet(); .actionGet();
List<GetDataFrameAnalyticsStatsAction.Response.Stats> stats = response.getResponse().results(); List<GetDataFrameAnalyticsStatsAction.Response.Stats> stats = response.getResponse().results();
assertThat("Got: " + stats.toString(), stats.size(), equalTo(1)); assertThat("Got: " + stats.toString(), stats, hasSize(1));
return stats.get(0); return stats.get(0);
} }
@ -196,7 +203,7 @@ abstract class MlNativeDataFrameAnalyticsIntegTestCase extends MlNativeIntegTest
GetDataFrameAnalyticsStatsAction.Response.Stats stats = getAnalyticsStats(id); GetDataFrameAnalyticsStatsAction.Response.Stats stats = getAnalyticsStats(id);
assertThat(stats.getId(), equalTo(id)); assertThat(stats.getId(), equalTo(id));
List<PhaseProgress> progress = stats.getProgress(); List<PhaseProgress> progress = stats.getProgress();
assertThat(progress.size(), equalTo(4)); assertThat(progress, hasSize(4));
assertThat(progress.get(0).getPhase(), equalTo("reindexing")); assertThat(progress.get(0).getPhase(), equalTo("reindexing"));
assertThat(progress.get(1).getPhase(), equalTo("loading_data")); assertThat(progress.get(1).getPhase(), equalTo("loading_data"));
assertThat(progress.get(2).getPhase(), equalTo("analyzing")); assertThat(progress.get(2).getPhase(), equalTo("analyzing"));
@ -221,6 +228,18 @@ abstract class MlNativeDataFrameAnalyticsIntegTestCase extends MlNativeIntegTest
assertThat("Hits were: " + Strings.toString(searchResponse.getHits()), searchResponse.getHits().getHits(), arrayWithSize(1)); assertThat("Hits were: " + Strings.toString(searchResponse.getHits()), searchResponse.getHits().getHits(), arrayWithSize(1));
} }
protected Collection<PersistentTasksCustomMetadata.PersistentTask<?>> analyticsTaskList() {
ClusterState masterClusterState = client().admin().cluster().prepareState().all().get().getState();
PersistentTasksCustomMetadata persistentTasks = masterClusterState.getMetadata().custom(PersistentTasksCustomMetadata.TYPE);
return persistentTasks != null
? persistentTasks.findTasks(MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, task -> true)
: Collections.emptyList();
}
protected List<TaskInfo> analyticsAssignedTaskList() {
return client().admin().cluster().prepareListTasks().setActions(MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME + "[c]").get().getTasks();
}
/** /**
* Asserts whether the audit messages fetched from index match provided prefixes. * Asserts whether the audit messages fetched from index match provided prefixes.
* More specifically, in order to pass: * More specifically, in order to pass:
@ -284,7 +303,7 @@ abstract class MlNativeDataFrameAnalyticsIntegTestCase extends MlNativeIntegTest
SearchResponse searchResponse = client().prepareSearch(AnomalyDetectorsIndex.jobStateIndexPattern()) SearchResponse searchResponse = client().prepareSearch(AnomalyDetectorsIndex.jobStateIndexPattern())
.setQuery(QueryBuilders.idsQuery().addIds(stateDocId)) .setQuery(QueryBuilders.idsQuery().addIds(stateDocId))
.get(); .get();
assertThat(searchResponse.getHits().getHits().length, equalTo(1)); assertThat("Hits were: " + Strings.toString(searchResponse.getHits()), searchResponse.getHits().getHits(), is(arrayWithSize(1)));
} }
protected static void assertMlResultsFieldMappings(String index, String predictedClassField, String expectedType) { protected static void assertMlResultsFieldMappings(String index, String predictedClassField, String expectedType) {

View File

@ -40,7 +40,10 @@ import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.utils.TypedChainTaskExecutor; import org.elasticsearch.xpack.ml.utils.TypedChainTaskExecutor;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -52,9 +55,13 @@ import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
import static org.elasticsearch.xpack.core.ml.MlTasks.AWAITING_UPGRADE; import static org.elasticsearch.xpack.core.ml.MlTasks.AWAITING_UPGRADE;
import static org.elasticsearch.xpack.core.ml.MlTasks.DATAFEED_TASK_NAME; import static org.elasticsearch.xpack.core.ml.MlTasks.DATAFEED_TASK_NAME;
import static org.elasticsearch.xpack.core.ml.MlTasks.JOB_TASK_NAME; import static org.elasticsearch.xpack.core.ml.MlTasks.JOB_TASK_NAME;
import static org.elasticsearch.xpack.core.ml.MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME;
public class TransportSetUpgradeModeAction extends TransportMasterNodeAction<SetUpgradeModeAction.Request, AcknowledgedResponse> { public class TransportSetUpgradeModeAction extends TransportMasterNodeAction<SetUpgradeModeAction.Request, AcknowledgedResponse> {
private static final Set<String> ML_TASK_NAMES =
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(JOB_TASK_NAME, DATAFEED_TASK_NAME, DATA_FRAME_ANALYTICS_TASK_NAME)));
private static final Logger logger = LogManager.getLogger(TransportSetUpgradeModeAction.class); private static final Logger logger = LogManager.getLogger(TransportSetUpgradeModeAction.class);
private final AtomicBoolean isRunning = new AtomicBoolean(false); private final AtomicBoolean isRunning = new AtomicBoolean(false);
@ -124,12 +131,12 @@ public class TransportSetUpgradeModeAction extends TransportMasterNodeAction<Set
// <4> We have unassigned the tasks, respond to the listener. // <4> We have unassigned the tasks, respond to the listener.
ActionListener<List<PersistentTask<?>>> unassignPersistentTasksListener = ActionListener.wrap( ActionListener<List<PersistentTask<?>>> unassignPersistentTasksListener = ActionListener.wrap(
unassigndPersistentTasks -> { unassignedPersistentTasks -> {
// Wait for our tasks to all stop // Wait for our tasks to all stop
client.admin() client.admin()
.cluster() .cluster()
.prepareListTasks() .prepareListTasks()
.setActions(DATAFEED_TASK_NAME + "[c]", JOB_TASK_NAME + "[c]") .setActions(ML_TASK_NAMES.stream().map(taskName -> taskName + "[c]").toArray(String[]::new))
// There is a chance that we failed un-allocating a task due to allocation_id being changed // There is a chance that we failed un-allocating a task due to allocation_id being changed
// This call will timeout in that case and return an error // This call will timeout in that case and return an error
.setWaitForCompletion(true) .setWaitForCompletion(true)
@ -161,8 +168,8 @@ public class TransportSetUpgradeModeAction extends TransportMasterNodeAction<Set
If we are enabling the option, we need to isolate the datafeeds so we can unassign the ML Jobs If we are enabling the option, we need to isolate the datafeeds so we can unassign the ML Jobs
</.1> </.1>
<.2> <.2>
If we are disabling the option, we need to wait to make sure all the job and datafeed tasks no longer have the upgrade mode If we are disabling the option, we need to wait to make sure all the job, datafeed and analytics tasks no longer have the
assignment upgrade mode assignment
We make no guarantees around which tasks will be running again once upgrade_mode is disabled. We make no guarantees around which tasks will be running again once upgrade_mode is disabled.
@ -198,16 +205,10 @@ public class TransportSetUpgradeModeAction extends TransportMasterNodeAction<Set
isolateDatafeeds(tasksCustomMetadata, isolateDatafeedListener); isolateDatafeeds(tasksCustomMetadata, isolateDatafeedListener);
} else { } else {
persistentTasksService.waitForPersistentTasksCondition( persistentTasksService.waitForPersistentTasksCondition(
(persistentTasksCustomMetadata) -> // Wait for jobs, datafeeds and analytics not to be "Awaiting upgrade"
// Wait for jobs to not be "Awaiting upgrade" persistentTasksCustomMetadata ->
persistentTasksCustomMetadata.findTasks(JOB_TASK_NAME, persistentTasksCustomMetadata.tasks().stream()
(t) -> t.getAssignment().equals(AWAITING_UPGRADE)) .noneMatch(t -> ML_TASK_NAMES.contains(t.getTaskName()) && t.getAssignment().equals(AWAITING_UPGRADE)),
.isEmpty() &&
// Wait for datafeeds to not be "Awaiting upgrade"
persistentTasksCustomMetadata.findTasks(DATAFEED_TASK_NAME,
(t) -> t.getAssignment().equals(AWAITING_UPGRADE))
.isEmpty(),
request.timeout(), request.timeout(),
ActionListener.wrap(r -> wrappedListener.onResponse(new AcknowledgedResponse(true)), wrappedListener::onFailure) ActionListener.wrap(r -> wrappedListener.onResponse(new AcknowledgedResponse(true)), wrappedListener::onFailure)
); );
@ -242,9 +243,9 @@ public class TransportSetUpgradeModeAction extends TransportMasterNodeAction<Set
} }
/** /**
* Unassigns all Job and Datafeed tasks. * Unassigns all Job, Datafeed and Data Frame Analytics tasks.
* <p> * <p>
* The reason for unassigning both types is that we want the Datafeed to attempt re-assignment once `upgrade_mode` is * The reason for unassigning both Job and Datafeed is that we want the Datafeed to attempt re-assignment once `upgrade_mode` is
* disabled. * disabled.
* <p> * <p>
* If we do not force an allocation change for the Datafeed tasks, they will never start again, since they were isolated. * If we do not force an allocation change for the Datafeed tasks, they will never start again, since they were isolated.
@ -256,18 +257,17 @@ public class TransportSetUpgradeModeAction extends TransportMasterNodeAction<Set
*/ */
private void unassignPersistentTasks(PersistentTasksCustomMetadata tasksCustomMetadata, private void unassignPersistentTasks(PersistentTasksCustomMetadata tasksCustomMetadata,
ActionListener<List<PersistentTask<?>>> listener) { ActionListener<List<PersistentTask<?>>> listener) {
List<PersistentTask<?>> datafeedAndJobTasks = tasksCustomMetadata List<PersistentTask<?>> mlTasks = tasksCustomMetadata
.tasks() .tasks()
.stream() .stream()
.filter(persistentTask -> (persistentTask.getTaskName().equals(MlTasks.JOB_TASK_NAME) || .filter(persistentTask -> ML_TASK_NAMES.contains(persistentTask.getTaskName()))
persistentTask.getTaskName().equals(MlTasks.DATAFEED_TASK_NAME)))
// We want to always have the same ordering of which tasks we un-allocate first. // We want to always have the same ordering of which tasks we un-allocate first.
// However, the order in which the distributed tasks handle the un-allocation event is not guaranteed. // However, the order in which the distributed tasks handle the un-allocation event is not guaranteed.
.sorted(Comparator.comparing(PersistentTask::getTaskName)) .sorted(Comparator.comparing(PersistentTask::getTaskName))
.collect(Collectors.toList()); .collect(Collectors.toList());
logger.info("Un-assigning persistent tasks : " + logger.info("Un-assigning persistent tasks : " +
datafeedAndJobTasks.stream().map(PersistentTask::getId).collect(Collectors.joining(", ", "[ ", " ]"))); mlTasks.stream().map(PersistentTask::getId).collect(Collectors.joining(", ", "[ ", " ]")));
TypedChainTaskExecutor<PersistentTask<?>> chainTaskExecutor = TypedChainTaskExecutor<PersistentTask<?>> chainTaskExecutor =
new TypedChainTaskExecutor<>(client.threadPool().executor(executor()), new TypedChainTaskExecutor<>(client.threadPool().executor(executor()),
@ -278,7 +278,7 @@ public class TransportSetUpgradeModeAction extends TransportMasterNodeAction<Set
// Consequently, if the exception is ResourceNotFoundException, continue execution; circuit break otherwise. // Consequently, if the exception is ResourceNotFoundException, continue execution; circuit break otherwise.
ex -> ExceptionsHelper.unwrapCause(ex) instanceof ResourceNotFoundException == false); ex -> ExceptionsHelper.unwrapCause(ex) instanceof ResourceNotFoundException == false);
for (PersistentTask<?> task : datafeedAndJobTasks) { for (PersistentTask<?> task : mlTasks) {
chainTaskExecutor.add( chainTaskExecutor.add(
chainedTask -> persistentTasksClusterService.unassignPersistentTask(task.getId(), chainedTask -> persistentTasksClusterService.unassignPersistentTask(task.getId(),
task.getAllocationId(), task.getAllocationId(),

View File

@ -90,8 +90,6 @@ import static org.elasticsearch.xpack.ml.MachineLearning.MAX_OPEN_JOBS_PER_NODE;
/** /**
* Starts the persistent task for running data frame analytics. * Starts the persistent task for running data frame analytics.
*
* TODO Add to the upgrade mode action
*/ */
public class TransportStartDataFrameAnalyticsAction public class TransportStartDataFrameAnalyticsAction
extends TransportMasterNodeAction<StartDataFrameAnalyticsAction.Request, AcknowledgedResponse> { extends TransportMasterNodeAction<StartDataFrameAnalyticsAction.Request, AcknowledgedResponse> {

View File

@ -53,8 +53,6 @@ import java.util.stream.Stream;
/** /**
* Stops the persistent task for running data frame analytics. * Stops the persistent task for running data frame analytics.
*
* TODO Add to the upgrade mode action
*/ */
public class TransportStopDataFrameAnalyticsAction public class TransportStopDataFrameAnalyticsAction
extends TransportTasksAction<DataFrameAnalyticsTask, StopDataFrameAnalyticsAction.Request, extends TransportTasksAction<DataFrameAnalyticsTask, StopDataFrameAnalyticsAction.Request,