ML: update set_upgrade_mode, add logging (#38372) (#38538)

* ML: update set_upgrade_mode, add logging

* Attempt to fix datafeed isolation

Also renamed a few methods/variables for clarity and added
some comments
This commit is contained in:
Benjamin Trent 2019-02-08 12:56:04 -06:00 committed by GitHub
parent 9fd99f18a0
commit 24a8ea06f5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 67 additions and 19 deletions

View File

@ -79,6 +79,7 @@ public final class Messages {
public static final String JOB_AUDIT_DATAFEED_STARTED_FROM_TO = "Datafeed started (from: {0} to: {1}) with frequency [{2}]"; public static final String JOB_AUDIT_DATAFEED_STARTED_FROM_TO = "Datafeed started (from: {0} to: {1}) with frequency [{2}]";
public static final String JOB_AUDIT_DATAFEED_STARTED_REALTIME = "Datafeed started in real-time"; public static final String JOB_AUDIT_DATAFEED_STARTED_REALTIME = "Datafeed started in real-time";
public static final String JOB_AUDIT_DATAFEED_STOPPED = "Datafeed stopped"; public static final String JOB_AUDIT_DATAFEED_STOPPED = "Datafeed stopped";
public static final String JOB_AUDIT_DATAFEED_ISOLATED = "Datafeed isolated";
public static final String JOB_AUDIT_DELETING = "Deleting job by task with id ''{0}''"; public static final String JOB_AUDIT_DELETING = "Deleting job by task with id ''{0}''";
public static final String JOB_AUDIT_DELETING_FAILED = "Error deleting job: {0}"; public static final String JOB_AUDIT_DELETING_FAILED = "Error deleting job: {0}";
public static final String JOB_AUDIT_DELETED = "Job deleted"; public static final String JOB_AUDIT_DELETED = "Job deleted";

View File

@ -45,7 +45,7 @@ public class MlLifeCycleService {
// datafeeds, so they get reallocated. We have to do this first, otherwise the datafeeds // datafeeds, so they get reallocated. We have to do this first, otherwise the datafeeds
// could fail if they send data to a dead autodetect process. // could fail if they send data to a dead autodetect process.
if (datafeedManager != null) { if (datafeedManager != null) {
datafeedManager.isolateAllDatafeedsOnThisNode(); datafeedManager.isolateAllDatafeedsOnThisNodeBeforeShutdown();
} }
NativeController nativeController = NativeControllerHolder.getNativeController(environment); NativeController nativeController = NativeControllerHolder.getNativeController(environment);
if (nativeController != null) { if (nativeController != null) {

View File

@ -263,6 +263,9 @@ public class TransportSetUpgradeModeAction extends TransportMasterNodeAction<Set
.sorted(Comparator.comparing(PersistentTask::getTaskName)) .sorted(Comparator.comparing(PersistentTask::getTaskName))
.collect(Collectors.toList()); .collect(Collectors.toList());
logger.info("Un-assigning persistent tasks : " +
datafeedAndJobTasks.stream().map(PersistentTask::getId).collect(Collectors.joining(", ", "[ ", " ]")));
TypedChainTaskExecutor<PersistentTask<?>> chainTaskExecutor = TypedChainTaskExecutor<PersistentTask<?>> chainTaskExecutor =
new TypedChainTaskExecutor<>(client.threadPool().executor(executor()), new TypedChainTaskExecutor<>(client.threadPool().executor(executor()),
r -> true, r -> true,
@ -287,6 +290,7 @@ public class TransportSetUpgradeModeAction extends TransportMasterNodeAction<Set
ActionListener<List<IsolateDatafeedAction.Response>> listener) { ActionListener<List<IsolateDatafeedAction.Response>> listener) {
Set<String> datafeedsToIsolate = MlTasks.startedDatafeedIds(tasksCustomMetaData); Set<String> datafeedsToIsolate = MlTasks.startedDatafeedIds(tasksCustomMetaData);
logger.info("Isolating datafeeds: " + datafeedsToIsolate.toString());
TypedChainTaskExecutor<IsolateDatafeedAction.Response> isolateDatafeedsExecutor = TypedChainTaskExecutor<IsolateDatafeedAction.Response> isolateDatafeedsExecutor =
new TypedChainTaskExecutor<>(client.threadPool().executor(executor()), r -> true, ex -> true); new TypedChainTaskExecutor<>(client.threadPool().executor(executor()), r -> true, ex -> true);

View File

@ -64,7 +64,6 @@ public class DatafeedManager {
private final DatafeedJobBuilder datafeedJobBuilder; private final DatafeedJobBuilder datafeedJobBuilder;
private final TaskRunner taskRunner = new TaskRunner(); private final TaskRunner taskRunner = new TaskRunner();
private final AutodetectProcessManager autodetectProcessManager; private final AutodetectProcessManager autodetectProcessManager;
private volatile boolean isolated;
public DatafeedManager(ThreadPool threadPool, Client client, ClusterService clusterService, DatafeedJobBuilder datafeedJobBuilder, public DatafeedManager(ThreadPool threadPool, Client client, ClusterService clusterService, DatafeedJobBuilder datafeedJobBuilder,
Supplier<Long> currentTimeSupplier, Auditor auditor, AutodetectProcessManager autodetectProcessManager) { Supplier<Long> currentTimeSupplier, Auditor auditor, AutodetectProcessManager autodetectProcessManager) {
@ -130,18 +129,20 @@ public class DatafeedManager {
* This is used before the JVM is killed. It differs from stopAllDatafeedsOnThisNode in that it leaves * This is used before the JVM is killed. It differs from stopAllDatafeedsOnThisNode in that it leaves
* the datafeed tasks in the "started" state, so that they get restarted on a different node. * the datafeed tasks in the "started" state, so that they get restarted on a different node.
*/ */
public void isolateAllDatafeedsOnThisNode() { public void isolateAllDatafeedsOnThisNodeBeforeShutdown() {
isolated = true;
Iterator<Holder> iter = runningDatafeedsOnThisNode.values().iterator(); Iterator<Holder> iter = runningDatafeedsOnThisNode.values().iterator();
while (iter.hasNext()) { while (iter.hasNext()) {
Holder next = iter.next(); Holder next = iter.next();
next.isolateDatafeed(); next.isolateDatafeed();
next.setRelocating(); // TODO: it's not ideal that this "isolate" method does something a bit different to the one below
next.setNodeIsShuttingDown();
iter.remove(); iter.remove();
} }
} }
public void isolateDatafeed(long allocationId) { public void isolateDatafeed(long allocationId) {
// This calls get() rather than remove() because we expect that the persistent task will
// be removed shortly afterwards and that operation needs to be able to find the holder
Holder holder = runningDatafeedsOnThisNode.get(allocationId); Holder holder = runningDatafeedsOnThisNode.get(allocationId);
if (holder != null) { if (holder != null) {
holder.isolateDatafeed(); holder.isolateDatafeed();
@ -195,7 +196,7 @@ public class DatafeedManager {
holder.stop("general_lookback_failure", TimeValue.timeValueSeconds(20), e); holder.stop("general_lookback_failure", TimeValue.timeValueSeconds(20), e);
return; return;
} }
if (isolated == false) { if (holder.isIsolated() == false) {
if (next != null) { if (next != null) {
doDatafeedRealtime(next, holder.datafeedJob.getJobId(), holder); doDatafeedRealtime(next, holder.datafeedJob.getJobId(), holder);
} else { } else {
@ -298,7 +299,7 @@ public class DatafeedManager {
private final ProblemTracker problemTracker; private final ProblemTracker problemTracker;
private final Consumer<Exception> finishHandler; private final Consumer<Exception> finishHandler;
volatile Scheduler.Cancellable cancellable; volatile Scheduler.Cancellable cancellable;
private volatile boolean isRelocating; private volatile boolean isNodeShuttingDown;
Holder(TransportStartDatafeedAction.DatafeedTask task, String datafeedId, DatafeedJob datafeedJob, Holder(TransportStartDatafeedAction.DatafeedTask task, String datafeedId, DatafeedJob datafeedJob,
ProblemTracker problemTracker, Consumer<Exception> finishHandler) { ProblemTracker problemTracker, Consumer<Exception> finishHandler) {
@ -324,7 +325,7 @@ public class DatafeedManager {
} }
public void stop(String source, TimeValue timeout, Exception e) { public void stop(String source, TimeValue timeout, Exception e) {
if (isRelocating) { if (isNodeShuttingDown) {
return; return;
} }
@ -344,11 +345,12 @@ public class DatafeedManager {
if (cancellable != null) { if (cancellable != null) {
cancellable.cancel(); cancellable.cancel();
} }
auditor.info(datafeedJob.getJobId(), Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_STOPPED)); auditor.info(datafeedJob.getJobId(),
Messages.getMessage(isIsolated() ? Messages.JOB_AUDIT_DATAFEED_ISOLATED : Messages.JOB_AUDIT_DATAFEED_STOPPED));
finishHandler.accept(e); finishHandler.accept(e);
logger.info("[{}] datafeed [{}] for job [{}] has been stopped{}", source, datafeedId, datafeedJob.getJobId(), logger.info("[{}] datafeed [{}] for job [{}] has been stopped{}", source, datafeedId, datafeedJob.getJobId(),
acquired ? "" : ", but there may be pending tasks as the timeout [" + timeout.getStringRep() + "] expired"); acquired ? "" : ", but there may be pending tasks as the timeout [" + timeout.getStringRep() + "] expired");
if (autoCloseJob) { if (autoCloseJob && isIsolated() == false) {
closeJob(); closeJob();
} }
if (acquired) { if (acquired) {
@ -361,16 +363,18 @@ public class DatafeedManager {
} }
/** /**
* This stops a datafeed WITHOUT updating the corresponding persistent task. It must ONLY be called * This stops a datafeed WITHOUT updating the corresponding persistent task. When called it
* immediately prior to shutting down a node. Then the datafeed task can remain "started", and be * will stop the datafeed from sending data to its job as quickly as possible. The caller
* relocated to a different node. Calling this method at any other time will ruin the datafeed. * must do something sensible with the corresponding persistent task. If the node is shutting
* down the task will automatically get reassigned. Otherwise the caller must take action to
* remove or reassign the persistent task, or the datafeed will be left in limbo.
*/ */
public void isolateDatafeed() { public void isolateDatafeed() {
datafeedJob.isolate(); datafeedJob.isolate();
} }
public void setRelocating() { public void setNodeIsShuttingDown() {
isRelocating = true; isNodeShuttingDown = true;
} }
private Long executeLookBack(long startTime, Long endTime) throws Exception { private Long executeLookBack(long startTime, Long endTime) throws Exception {

View File

@ -6,6 +6,10 @@ setup:
indices.create: indices.create:
index: airline-data index: airline-data
body: body:
settings:
index:
number_of_replicas: 0
number_of_shards: 1
mappings: mappings:
properties: properties:
time: time:
@ -53,10 +57,9 @@ setup:
job_id: set-upgrade-mode-job job_id: set-upgrade-mode-job
- do: - do:
headers: cluster.health:
Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser index: airline-data
ml.start_datafeed: wait_for_status: green
datafeed_id: set-upgrade-mode-job-datafeed
--- ---
teardown: teardown:
@ -70,6 +73,10 @@ teardown:
--- ---
"Test setting upgrade_mode to false when it is already false": "Test setting upgrade_mode to false when it is already false":
- do:
ml.start_datafeed:
datafeed_id: set-upgrade-mode-job-datafeed
- do: - do:
ml.set_upgrade_mode: ml.set_upgrade_mode:
enabled: false enabled: false
@ -92,6 +99,22 @@ teardown:
--- ---
"Setting upgrade_mode to enabled": "Setting upgrade_mode to enabled":
- do:
ml.start_datafeed:
datafeed_id: set-upgrade-mode-job-datafeed
- do:
cat.tasks: {}
- match:
$body: |
/.+job.+/
- do:
cat.tasks: {}
- match:
$body: |
/.+datafeed.+/
- do: - do:
ml.info: {} ml.info: {}
- match: { upgrade_mode: false } - match: { upgrade_mode: false }
@ -125,6 +148,22 @@ teardown:
--- ---
"Setting upgrade mode to disabled from enabled": "Setting upgrade mode to disabled from enabled":
- do:
ml.start_datafeed:
datafeed_id: set-upgrade-mode-job-datafeed
- do:
cat.tasks: {}
- match:
$body: |
/.+job.+/
- do:
cat.tasks: {}
- match:
$body: |
/.+datafeed.+/
- do: - do:
ml.set_upgrade_mode: ml.set_upgrade_mode:
enabled: true enabled: true