From bbcdea43c54f7aa40c1298d7b34a6f185805fcff Mon Sep 17 00:00:00 2001 From: David Roberts Date: Tue, 19 Feb 2019 13:47:47 +0000 Subject: [PATCH] [ML] Allow stop unassigned datafeed and relax unset upgrade mode wait (#39034) These two changes are interlinked. Before this change unsetting ML upgrade mode would wait for all datafeeds to be assigned and not waiting for their corresponding jobs to initialise. However, this could be inappropriate, if there was a reason other that upgrade mode why one job was unable to be assigned or slow to start up. Unsetting of upgrade mode would hang in this case. This change relaxes the condition for considering upgrade mode to be unset to simply that an assignment attempt has been made for each ML persistent task that did not fail because upgrade mode was enabled. Thus after unsetting upgrade mode there is no guarantee that every ML persistent task is assigned, just that each is not unassigned due to upgrade mode. In order to make setting upgrade mode work immediately after unsetting upgrade mode it was then also necessary to make it possible to stop a datafeed that was not assigned. There was no particularly good reason why this was not allowed in the past. It is trivial to stop an unassigned datafeed because it just involves removing the persistent task. --- .../action/TransportSetUpgradeModeAction.java | 8 +--- .../action/TransportStopDatafeedAction.java | 41 +++++++++++-------- .../integration/MlDistributedFailureIT.java | 13 ++---- .../test/ml/set_upgrade_mode.yml | 12 +++--- 4 files changed, 34 insertions(+), 40 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportSetUpgradeModeAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportSetUpgradeModeAction.java index d16f9e18421..58ff31a6bc8 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportSetUpgradeModeAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportSetUpgradeModeAction.java @@ -197,13 +197,9 @@ public class TransportSetUpgradeModeAction extends TransportMasterNodeAction t.getAssignment().equals(AWAITING_UPGRADE)) .isEmpty() && - // Datafeeds to wait for a non-"Awaiting upgrade" assignment and for the job task allocations to converge - // If we do not wait, deleting datafeeds, or attempting to unallocate them again causes issues as the - // job's task allocationId could have changed during either process. + // Wait for datafeeds to not be "Awaiting upgrade" persistentTasksCustomMetaData.findTasks(DATAFEED_TASK_NAME, - (t) -> - t.getAssignment().equals(AWAITING_UPGRADE) || - t.getAssignment().getExplanation().contains("state is stale")) + (t) -> t.getAssignment().equals(AWAITING_UPGRADE)) .isEmpty(), request.timeout(), ActionListener.wrap(r -> wrappedListener.onResponse(new AcknowledgedResponse(true)), wrappedListener::onFailure) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java index 636138a855b..cbd55bb60d8 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java @@ -29,7 +29,6 @@ import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; -import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; @@ -104,7 +103,7 @@ public class TransportStopDatafeedAction extends TransportTasksAction executorNodes = new HashSet<>(); for (String datafeedId : startedDatafeeds) { PersistentTasksCustomMetaData.PersistentTask datafeedTask = MlTasks.getDatafeedTask(datafeedId, tasks); - if (datafeedTask == null || datafeedTask.isAssigned() == false) { - String message = "Cannot stop datafeed [" + datafeedId + "] because the datafeed does not have an assigned node." + - " Use force stop to stop the datafeed"; - listener.onFailure(ExceptionsHelper.conflictStatusException(message)); - return; - } else { + if (datafeedTask == null) { + // This should not happen, because startedDatafeeds was derived from the same tasks that is passed to this method + String msg = "Requested datafeed [" + datafeedId + "] be stopped, but datafeed's task could not be found."; + assert datafeedTask != null : msg; + logger.error(msg); + } else if (datafeedTask.isAssigned()) { executorNodes.add(datafeedTask.getExecutorNode()); + } else { + // This is the easy case - the datafeed is not currently assigned to a node, + // so can be gracefully stopped simply by removing its persistent task. (Usually + // a graceful stop cannot be achieved by simply removing the persistent task, but + // if the datafeed has no running code then graceful/forceful are the same.) + // The listener here can be a no-op, as waitForDatafeedStopped() already waits for + // these persistent tasks to disappear. + persistentTasksService.sendRemoveRequest(datafeedTask.getId(), ActionListener.wrap(r -> {}, e -> {})); } } @@ -198,9 +205,10 @@ public class TransportStopDatafeedAction extends TransportTasksAction listener, AtomicArray failures) { - List catchedExceptions = failures.asList(); - if (catchedExceptions.size() == 0) { + List caughtExceptions = failures.asList(); + if (caughtExceptions.size() == 0) { listener.onResponse(new StopDatafeedAction.Response(true)); return; } - String msg = "Failed to stop datafeed [" + datafeedId + "] with [" + catchedExceptions.size() + String msg = "Failed to stop datafeed [" + datafeedId + "] with [" + caughtExceptions.size() + "] failures, rethrowing last, all Exceptions: [" - + catchedExceptions.stream().map(Exception::getMessage).collect(Collectors.joining(", ")) + + caughtExceptions.stream().map(Exception::getMessage).collect(Collectors.joining(", ")) + "]"; - ElasticsearchException e = new ElasticsearchException(msg, - catchedExceptions.get(0)); + ElasticsearchException e = new ElasticsearchException(msg, caughtExceptions.get(0)); listener.onFailure(e); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java index d68fe5225fb..5b9766efe00 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java @@ -157,22 +157,15 @@ public class MlDistributedFailureIT extends BaseMlIntegTestCase { client().execute(GetDatafeedsStatsAction.INSTANCE, datafeedStatsRequest).actionGet(); assertEquals(DatafeedState.STARTED, datafeedStatsResponse.getResponse().results().get(0).getDatafeedState()); - // Can't normal stop an unassigned datafeed + // An unassigned datafeed can be stopped either normally or by force StopDatafeedAction.Request stopDatafeedRequest = new StopDatafeedAction.Request(datafeedId); - ElasticsearchStatusException statusException = expectThrows(ElasticsearchStatusException.class, - () -> client().execute(StopDatafeedAction.INSTANCE, stopDatafeedRequest).actionGet()); - assertEquals("Cannot stop datafeed [" + datafeedId + - "] because the datafeed does not have an assigned node. Use force stop to stop the datafeed", - statusException.getMessage()); - - // Can only force stop an unassigned datafeed - stopDatafeedRequest.setForce(true); + stopDatafeedRequest.setForce(randomBoolean()); StopDatafeedAction.Response stopDatafeedResponse = client().execute(StopDatafeedAction.INSTANCE, stopDatafeedRequest).actionGet(); assertTrue(stopDatafeedResponse.isStopped()); // Can't normal stop an unassigned job CloseJobAction.Request closeJobRequest = new CloseJobAction.Request(jobId); - statusException = expectThrows(ElasticsearchStatusException.class, + ElasticsearchStatusException statusException = expectThrows(ElasticsearchStatusException.class, () -> client().execute(CloseJobAction.INSTANCE, closeJobRequest).actionGet()); assertEquals("Cannot close job [" + jobId + "] because the job does not have an assigned node. Use force close to close the job", diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/set_upgrade_mode.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/set_upgrade_mode.yml index 9b33af5f48b..4a93e46c6b4 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/set_upgrade_mode.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/set_upgrade_mode.yml @@ -206,7 +206,11 @@ teardown: ml.get_datafeed_stats: datafeed_id: set-upgrade-mode-job-datafeed - match: { datafeeds.0.state: "started" } - - match: { datafeeds.0.assignment_explanation: "" } + # The datafeed will not be assigned until the job has updated its status on the node it's assigned + # to, and that probably won't happen in time for this assertion. That is indicated by an assignment + # reason ending "state is stale". However, the datafeed should NOT be unassigned with a reason of + # "upgrade mode is enabled" - that reason should have gone away before this test. + - match: { datafeeds.0.assignment_explanation: /(^$|.+job.+state.is.stale)/ } - do: cat.tasks: {} @@ -214,12 +218,6 @@ teardown: $body: | /.+job.+/ - - do: - cat.tasks: {} - - match: - $body: | - /.+datafeed.+/ - --- "Attempt to open job when upgrade_mode is enabled": - do: