[ML][Data Frame] adding dynamic cluster setting for failure retries (#44577) (#44639)

This adds a new dynamic cluster setting `xpack.data_frame.num_transform_failure_retries`.

This setting indicates how many times non-critical failures should be retried before a data frame transform is marked as failed and should stop executing. At the time of this commit; Min: 0, Max: 100, Default: 10
This commit is contained in:
Benjamin Trent 2019-07-19 16:17:39 -05:00 committed by GitHub
parent f193d14764
commit 2e303fc5f7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 79 additions and 6 deletions

View File

@ -0,0 +1,40 @@
[role="xpack"]
[[data-frames-settings]]
=== {dataframe-transforms-cap} settings in Elasticsearch
[subs="attributes"]
++++
<titleabbrev>{dataframe-transforms-cap} settings</titleabbrev>
++++
You do not need to configure any settings to use {dataframe-transforms}. It is enabled by default.
All of these settings can be added to the `elasticsearch.yml` configuration file.
The dynamic settings can also be updated across a cluster with the
<<cluster-update-settings,cluster update settings API>>.
TIP: Dynamic settings take precedence over settings in the `elasticsearch.yml`
file.
[float]
[[general-data-frames-settings]]
==== General {dataframe-transforms} settings
`xpack.data_frame.enabled`::
Set to `true` (default) to enable {dataframe-transforms} on the node. +
+
If set to `false` in `elasticsearch.yml`, the {dataframe-transform} APIs are disabled on the node.
Therefore the node cannot start or administrate transforms or receive transport (internal)
communication requests related to {dataframe-transform} APIs.
+
IMPORTANT: If you want to use {dataframe-transform} features in your cluster, you must have
`xpack.data_frame.enabled` set to `true` on all master-eligible nodes. This is the
default behavior.
`xpack.data_frame.num_transform_failure_retries` (<<cluster-update-settings,Dynamic>>)::
The number of times that a {dataframe-transform} retries when it experiences a
non-fatal error. Once the number of retries is exhausted, the {dataframe-transform}
task will be marked as `failed`. The default value is `10` with a valid minimum of `0`
and maximum of `100`.
If a {dataframe-transform} is already running, it will have to be restarted
to use the changed setting.

View File

@ -51,6 +51,8 @@ include::settings/audit-settings.asciidoc[]
include::settings/ccr-settings.asciidoc[]
include::settings/data-frames-settings.asciidoc[]
include::settings/ilm-settings.asciidoc[]
include::settings/license-settings.asciidoc[]

View File

@ -62,7 +62,7 @@ public class XPackSettings {
/** Setting for enabling or disabling graph. Defaults to true. */
public static final Setting<Boolean> GRAPH_ENABLED = Setting.boolSetting("xpack.graph.enabled", true, Setting.Property.NodeScope);
/** Setting for enabling or disabling machine learning. Defaults to false. */
/** Setting for enabling or disabling machine learning. Defaults to true. */
public static final Setting<Boolean> MACHINE_LEARNING_ENABLED = Setting.boolSetting("xpack.ml.enabled", true,
Setting.Property.NodeScope);

View File

@ -20,6 +20,7 @@ import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.common.settings.SettingsModule;
@ -71,6 +72,7 @@ import org.elasticsearch.xpack.dataframe.rest.action.RestPutDataFrameTransformAc
import org.elasticsearch.xpack.dataframe.rest.action.RestStartDataFrameTransformAction;
import org.elasticsearch.xpack.dataframe.rest.action.RestStopDataFrameTransformAction;
import org.elasticsearch.xpack.dataframe.transforms.DataFrameTransformPersistentTasksExecutor;
import org.elasticsearch.xpack.dataframe.transforms.DataFrameTransformTask;
import java.io.IOException;
import java.time.Clock;
@ -221,6 +223,10 @@ public class DataFrame extends Plugin implements ActionPlugin, PersistentTaskPlu
dataFrameTransformsCheckpointService.get(), schedulerEngine.get(), dataFrameAuditor.get(), threadPool));
}
public List<Setting<?>> getSettings() {
return Collections.singletonList(DataFrameTransformTask.NUM_FAILURE_RETRIES_SETTING);
}
@Override
public void close() {
if (schedulerEngine.get() != null) {

View File

@ -32,6 +32,7 @@ public class TransportStartDataFrameTransformTaskAction extends
TransportTasksAction<DataFrameTransformTask, StartDataFrameTransformTaskAction.Request,
StartDataFrameTransformTaskAction.Response, StartDataFrameTransformTaskAction.Response> {
private volatile int numFailureRetries;
private final XPackLicenseState licenseState;
@Inject
@ -41,6 +42,8 @@ public class TransportStartDataFrameTransformTaskAction extends
StartDataFrameTransformTaskAction.Request::new, StartDataFrameTransformTaskAction.Response::new,
StartDataFrameTransformTaskAction.Response::new, ThreadPool.Names.SAME);
this.licenseState = licenseState;
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(DataFrameTransformTask.NUM_FAILURE_RETRIES_SETTING, this::setNumFailureRetries);
}
@Override
@ -59,7 +62,7 @@ public class TransportStartDataFrameTransformTaskAction extends
protected void taskOperation(StartDataFrameTransformTaskAction.Request request, DataFrameTransformTask transformTask,
ActionListener<StartDataFrameTransformTaskAction.Response> listener) {
if (transformTask.getTransformId().equals(request.getId())) {
transformTask.start(null, listener);
transformTask.setNumFailureRetries(numFailureRetries).start(null, listener);
} else {
listener.onFailure(new RuntimeException("ID of data frame transform task [" + transformTask.getTransformId()
+ "] does not match request's ID [" + request.getId() + "]"));
@ -90,4 +93,8 @@ public class TransportStartDataFrameTransformTaskAction extends
boolean allStarted = tasks.stream().allMatch(StartDataFrameTransformTaskAction.Response::isStarted);
return new StartDataFrameTransformTaskAction.Response(allStarted);
}
void setNumFailureRetries(int numFailureRetries) {
this.numFailureRetries = numFailureRetries;
}
}

View File

@ -20,6 +20,7 @@ import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.persistent.AllocatedPersistentTask;
@ -63,8 +64,16 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
// Default interval the scheduler sends an event if the config does not specify a frequency
private static final long SCHEDULER_NEXT_MILLISECONDS = 60000;
private static final Logger logger = LogManager.getLogger(DataFrameTransformTask.class);
// TODO consider moving to dynamic cluster setting
private static final int MAX_CONTINUOUS_FAILURES = 10;
private static final int DEFAULT_FAILURE_RETRIES = 10;
private volatile int numFailureRetries = DEFAULT_FAILURE_RETRIES;
// How many times the transform task can retry on an non-critical failure
public static final Setting<Integer> NUM_FAILURE_RETRIES_SETTING = Setting.intSetting(
"xpack.data_frame.num_transform_failure_retries",
DEFAULT_FAILURE_RETRIES,
0,
100,
Setting.Property.NodeScope,
Setting.Property.Dynamic);
private static final IndexerState[] RUNNING_STATES = new IndexerState[]{IndexerState.STARTED, IndexerState.INDEXING};
public static final String SCHEDULE_NAME = DataFrameField.TASK_NAME + "/schedule";
@ -347,6 +356,15 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
}
}
public DataFrameTransformTask setNumFailureRetries(int numFailureRetries) {
this.numFailureRetries = numFailureRetries;
return this;
}
public int getNumFailureRetries() {
return numFailureRetries;
}
private void registerWithSchedulerJob() {
schedulerEngine.register(this);
final SchedulerEngine.Job schedulerJob = new SchedulerEngine.Job(schedulerJobName(), next());
@ -828,10 +846,10 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
return;
}
if (isIrrecoverableFailure(e) || failureCount.incrementAndGet() > MAX_CONTINUOUS_FAILURES) {
if (isIrrecoverableFailure(e) || failureCount.incrementAndGet() > transformTask.getNumFailureRetries()) {
String failureMessage = isIrrecoverableFailure(e) ?
"task encountered irrecoverable failure: " + e.getMessage() :
"task encountered more than " + MAX_CONTINUOUS_FAILURES + " failures; latest failure: " + e.getMessage();
"task encountered more than " + transformTask.getNumFailureRetries() + " failures; latest failure: " + e.getMessage();
failIndexer(failureMessage);
}
}