[ML] Allow stop unassigned datafeed and relax unset upgrade mode wait (#39034)

These two changes are interlinked.

Before this change unsetting ML upgrade mode would wait for all
datafeeds to be assigned and not waiting for their corresponding
jobs to initialise.  However, this could be inappropriate, if
there was a reason other that upgrade mode why one job was unable
to be assigned or slow to start up.  Unsetting of upgrade mode
would hang in this case.

This change relaxes the condition for considering upgrade mode to
be unset to simply that an assignment attempt has been made for
each ML persistent task that did not fail because upgrade mode
was enabled.  Thus after unsetting upgrade mode there is no
guarantee that every ML persistent task is assigned, just that
each is not unassigned due to upgrade mode.

In order to make setting upgrade mode work immediately after
unsetting upgrade mode it was then also necessary to make it
possible to stop a datafeed that was not assigned.  There was
no particularly good reason why this was not allowed in the past.
It is trivial to stop an unassigned datafeed because it just
involves removing the persistent task.
This commit is contained in:
David Roberts 2019-02-19 13:47:47 +00:00
parent d8852b83d0
commit bbcdea43c5
4 changed files with 34 additions and 40 deletions

View File

@ -197,13 +197,9 @@ public class TransportSetUpgradeModeAction extends TransportMasterNodeAction<Set
(t) -> t.getAssignment().equals(AWAITING_UPGRADE)) (t) -> t.getAssignment().equals(AWAITING_UPGRADE))
.isEmpty() && .isEmpty() &&
// Datafeeds to wait for a non-"Awaiting upgrade" assignment and for the job task allocations to converge // Wait for datafeeds to not be "Awaiting upgrade"
// If we do not wait, deleting datafeeds, or attempting to unallocate them again causes issues as the
// job's task allocationId could have changed during either process.
persistentTasksCustomMetaData.findTasks(DATAFEED_TASK_NAME, persistentTasksCustomMetaData.findTasks(DATAFEED_TASK_NAME,
(t) -> (t) -> t.getAssignment().equals(AWAITING_UPGRADE))
t.getAssignment().equals(AWAITING_UPGRADE) ||
t.getAssignment().getExplanation().contains("state is stale"))
.isEmpty(), .isEmpty(),
request.timeout(), request.timeout(),
ActionListener.wrap(r -> wrappedListener.onResponse(new AcknowledgedResponse(true)), wrappedListener::onFailure) ActionListener.wrap(r -> wrappedListener.onResponse(new AcknowledgedResponse(true)), wrappedListener::onFailure)

View File

@ -29,7 +29,6 @@ import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction; import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
@ -104,7 +103,7 @@ public class TransportStopDatafeedAction extends TransportTasksAction<TransportS
final DiscoveryNodes nodes = state.nodes(); final DiscoveryNodes nodes = state.nodes();
if (nodes.isLocalNodeElectedMaster() == false) { if (nodes.isLocalNodeElectedMaster() == false) {
// Delegates stop datafeed to elected master node, so it becomes the coordinating node. // Delegates stop datafeed to elected master node, so it becomes the coordinating node.
// See comment in StartDatafeedAction.Transport class for more information. // See comment in TransportStartDatafeedAction for more information.
if (nodes.getMasterNode() == null) { if (nodes.getMasterNode() == null) {
listener.onFailure(new MasterNotDiscoveredException("no known master node")); listener.onFailure(new MasterNotDiscoveredException("no known master node"));
} else { } else {
@ -142,13 +141,21 @@ public class TransportStopDatafeedAction extends TransportTasksAction<TransportS
Set<String> executorNodes = new HashSet<>(); Set<String> executorNodes = new HashSet<>();
for (String datafeedId : startedDatafeeds) { for (String datafeedId : startedDatafeeds) {
PersistentTasksCustomMetaData.PersistentTask<?> datafeedTask = MlTasks.getDatafeedTask(datafeedId, tasks); PersistentTasksCustomMetaData.PersistentTask<?> datafeedTask = MlTasks.getDatafeedTask(datafeedId, tasks);
if (datafeedTask == null || datafeedTask.isAssigned() == false) { if (datafeedTask == null) {
String message = "Cannot stop datafeed [" + datafeedId + "] because the datafeed does not have an assigned node." + // This should not happen, because startedDatafeeds was derived from the same tasks that is passed to this method
" Use force stop to stop the datafeed"; String msg = "Requested datafeed [" + datafeedId + "] be stopped, but datafeed's task could not be found.";
listener.onFailure(ExceptionsHelper.conflictStatusException(message)); assert datafeedTask != null : msg;
return; logger.error(msg);
} else { } else if (datafeedTask.isAssigned()) {
executorNodes.add(datafeedTask.getExecutorNode()); executorNodes.add(datafeedTask.getExecutorNode());
} else {
// This is the easy case - the datafeed is not currently assigned to a node,
// so can be gracefully stopped simply by removing its persistent task. (Usually
// a graceful stop cannot be achieved by simply removing the persistent task, but
// if the datafeed has no running code then graceful/forceful are the same.)
// The listener here can be a no-op, as waitForDatafeedStopped() already waits for
// these persistent tasks to disappear.
persistentTasksService.sendRemoveRequest(datafeedTask.getId(), ActionListener.wrap(r -> {}, e -> {}));
} }
} }
@ -198,9 +205,10 @@ public class TransportStopDatafeedAction extends TransportTasksAction<TransportS
} }
}); });
} else { } else {
String msg = "Requested datafeed [" + request.getDatafeedId() + "] be force-stopped, but " + // This should not happen, because startedDatafeeds was derived from the same tasks that is passed to this method
"datafeed's task could not be found."; String msg = "Requested datafeed [" + datafeedId + "] be force-stopped, but datafeed's task could not be found.";
logger.warn(msg); assert datafeedTask != null : msg;
logger.error(msg);
final int slot = counter.incrementAndGet(); final int slot = counter.incrementAndGet();
failures.set(slot - 1, new RuntimeException(msg)); failures.set(slot - 1, new RuntimeException(msg));
if (slot == startedDatafeeds.size()) { if (slot == startedDatafeeds.size()) {
@ -248,19 +256,18 @@ public class TransportStopDatafeedAction extends TransportTasksAction<TransportS
private void sendResponseOrFailure(String datafeedId, ActionListener<StopDatafeedAction.Response> listener, private void sendResponseOrFailure(String datafeedId, ActionListener<StopDatafeedAction.Response> listener,
AtomicArray<Exception> failures) { AtomicArray<Exception> failures) {
List<Exception> catchedExceptions = failures.asList(); List<Exception> caughtExceptions = failures.asList();
if (catchedExceptions.size() == 0) { if (caughtExceptions.size() == 0) {
listener.onResponse(new StopDatafeedAction.Response(true)); listener.onResponse(new StopDatafeedAction.Response(true));
return; return;
} }
String msg = "Failed to stop datafeed [" + datafeedId + "] with [" + catchedExceptions.size() String msg = "Failed to stop datafeed [" + datafeedId + "] with [" + caughtExceptions.size()
+ "] failures, rethrowing last, all Exceptions: [" + "] failures, rethrowing last, all Exceptions: ["
+ catchedExceptions.stream().map(Exception::getMessage).collect(Collectors.joining(", ")) + caughtExceptions.stream().map(Exception::getMessage).collect(Collectors.joining(", "))
+ "]"; + "]";
ElasticsearchException e = new ElasticsearchException(msg, ElasticsearchException e = new ElasticsearchException(msg, caughtExceptions.get(0));
catchedExceptions.get(0));
listener.onFailure(e); listener.onFailure(e);
} }

View File

@ -157,22 +157,15 @@ public class MlDistributedFailureIT extends BaseMlIntegTestCase {
client().execute(GetDatafeedsStatsAction.INSTANCE, datafeedStatsRequest).actionGet(); client().execute(GetDatafeedsStatsAction.INSTANCE, datafeedStatsRequest).actionGet();
assertEquals(DatafeedState.STARTED, datafeedStatsResponse.getResponse().results().get(0).getDatafeedState()); assertEquals(DatafeedState.STARTED, datafeedStatsResponse.getResponse().results().get(0).getDatafeedState());
// Can't normal stop an unassigned datafeed // An unassigned datafeed can be stopped either normally or by force
StopDatafeedAction.Request stopDatafeedRequest = new StopDatafeedAction.Request(datafeedId); StopDatafeedAction.Request stopDatafeedRequest = new StopDatafeedAction.Request(datafeedId);
ElasticsearchStatusException statusException = expectThrows(ElasticsearchStatusException.class, stopDatafeedRequest.setForce(randomBoolean());
() -> client().execute(StopDatafeedAction.INSTANCE, stopDatafeedRequest).actionGet());
assertEquals("Cannot stop datafeed [" + datafeedId +
"] because the datafeed does not have an assigned node. Use force stop to stop the datafeed",
statusException.getMessage());
// Can only force stop an unassigned datafeed
stopDatafeedRequest.setForce(true);
StopDatafeedAction.Response stopDatafeedResponse = client().execute(StopDatafeedAction.INSTANCE, stopDatafeedRequest).actionGet(); StopDatafeedAction.Response stopDatafeedResponse = client().execute(StopDatafeedAction.INSTANCE, stopDatafeedRequest).actionGet();
assertTrue(stopDatafeedResponse.isStopped()); assertTrue(stopDatafeedResponse.isStopped());
// Can't normal stop an unassigned job // Can't normal stop an unassigned job
CloseJobAction.Request closeJobRequest = new CloseJobAction.Request(jobId); CloseJobAction.Request closeJobRequest = new CloseJobAction.Request(jobId);
statusException = expectThrows(ElasticsearchStatusException.class, ElasticsearchStatusException statusException = expectThrows(ElasticsearchStatusException.class,
() -> client().execute(CloseJobAction.INSTANCE, closeJobRequest).actionGet()); () -> client().execute(CloseJobAction.INSTANCE, closeJobRequest).actionGet());
assertEquals("Cannot close job [" + jobId + assertEquals("Cannot close job [" + jobId +
"] because the job does not have an assigned node. Use force close to close the job", "] because the job does not have an assigned node. Use force close to close the job",

View File

@ -206,7 +206,11 @@ teardown:
ml.get_datafeed_stats: ml.get_datafeed_stats:
datafeed_id: set-upgrade-mode-job-datafeed datafeed_id: set-upgrade-mode-job-datafeed
- match: { datafeeds.0.state: "started" } - match: { datafeeds.0.state: "started" }
- match: { datafeeds.0.assignment_explanation: "" } # The datafeed will not be assigned until the job has updated its status on the node it's assigned
# to, and that probably won't happen in time for this assertion. That is indicated by an assignment
# reason ending "state is stale". However, the datafeed should NOT be unassigned with a reason of
# "upgrade mode is enabled" - that reason should have gone away before this test.
- match: { datafeeds.0.assignment_explanation: /(^$|.+job.+state.is.stale)/ }
- do: - do:
cat.tasks: {} cat.tasks: {}
@ -214,12 +218,6 @@ teardown:
$body: | $body: |
/.+job.+/ /.+job.+/
- do:
cat.tasks: {}
- match:
$body: |
/.+datafeed.+/
--- ---
"Attempt to open job when upgrade_mode is enabled": "Attempt to open job when upgrade_mode is enabled":
- do: - do: