From 88641a08afb9afa00fe32da94a87e9ff9871200b Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Tue, 20 Aug 2019 07:30:17 -0500 Subject: [PATCH] [ML][Data frame] fixing failure state transitions and race condition (#45627) (#45656) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * [ML][Data frame] fixing failure state transitions and race condition (#45627) There is a small window for a race condition while we are flagging a task as failed. Here are the steps where the race condition occurs: 1. A failure occurs 2. Before `AsyncTwoPhaseIndexer` calls the `onFailure` handler it does the following: a. `finishAndSetState()` which sets the IndexerState to STARTED b. `doSaveState(...)` which attempts to save the current state of the indexer 3. Another trigger is fired BEFORE `onFailure` can fire, but AFTER `finishAndSetState()` occurs. The trick here is that we will eventually set the indexer to failed, but possibly not before another trigger had the opportunity to fire. This could obviously cause some weird state interactions. To combat this, I have put in some predicates to verify the state before taking actions. This is so if state is indeed marked failed, the "second trigger" stops ASAP. Additionally, I move the task state checks INTO the `start` and `stop` methods, which will now require a `force` parameter. `start`, `stop`, `trigger` and `markAsFailed` are all `synchronized`. This should gives us some guarantees that one will not switch states out from underneath another. I also flag the task as `failed` BEFORE we successfully write it to cluster state, this is to allow us to make the task fail more quickly. But, this does add the behavior where the task is "failed" but the cluster state does not indicate as much. Adding the checks in `start` and `stop` will handle this "real state vs cluster state" race condition. This has always been a problem for `_stop` as it is not a master node action and doesn’t always have the latest cluster state. closes #45609 Relates to #45562 * [ML][Data Frame] moves failure state transition for MT safety (#45676) * [ML][Data Frame] moves failure state transition for MT safety * removing unused imports --- .../core/dataframe/DataFrameMessages.java | 4 + .../StartDataFrameTransformTaskAction.java | 23 +++- .../core/indexing/AsyncTwoPhaseIndexer.java | 25 ++-- .../dataframe/DataFrameMessagesTests.java | 2 +- ...aFrameTransformTaskActionRequestTests.java | 2 +- .../DataFrameTaskFailedStateIT.java | 23 ++-- ...ransportStartDataFrameTransformAction.java | 2 +- ...portStartDataFrameTransformTaskAction.java | 3 +- ...TransportStopDataFrameTransformAction.java | 127 +++++++++++------- ...FrameTransformPersistentTasksExecutor.java | 2 +- .../transforms/DataFrameTransformTask.java | 99 +++++++++++--- .../rollup/job/RollupIndexerStateTests.java | 16 ++- 12 files changed, 230 insertions(+), 98 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameMessages.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameMessages.java index 45e923de231..32f639a1fac 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameMessages.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameMessages.java @@ -34,6 +34,10 @@ public class DataFrameMessages { public static final String DATA_FRAME_CANNOT_STOP_FAILED_TRANSFORM = "Unable to stop data frame transform [{0}] as it is in a failed state with reason [{1}]." + " Use force stop to stop the data frame transform."; + public static final String DATA_FRAME_CANNOT_START_FAILED_TRANSFORM = + "Unable to start data frame transform [{0}] as it is in a failed state with failure: [{1}]. " + + "Use force start to restart data frame transform once error is resolved."; + public static final String FAILED_TO_CREATE_DESTINATION_INDEX = "Could not create destination index [{0}] for transform [{1}]"; public static final String FAILED_TO_RELOAD_TRANSFORM_CONFIGURATION = "Failed to reload data frame transform configuration for transform [{0}]"; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/StartDataFrameTransformTaskAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/StartDataFrameTransformTaskAction.java index e1ebe4eb0ab..4fe87d9727f 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/StartDataFrameTransformTaskAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/StartDataFrameTransformTaskAction.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.core.dataframe.action; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.support.tasks.BaseTasksRequest; @@ -34,24 +35,40 @@ public class StartDataFrameTransformTaskAction extends ActionType { private final String id; + private final boolean force; - public Request(String id) { + public Request(String id, boolean force) { this.id = ExceptionsHelper.requireNonNull(id, DataFrameField.ID.getPreferredName()); + this.force = force; } public Request(StreamInput in) throws IOException { super(in); id = in.readString(); + if (in.getVersion().onOrAfter(Version.V_7_4_0)) { + force = in.readBoolean(); + } else { + // The behavior before V_7_4_0 was that this flag did not exist, + // assuming previous checks allowed this task to be started. + force = true; + } } public String getId() { return id; } + public boolean isForce() { + return force; + } + @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeString(id); + if (out.getVersion().onOrAfter(Version.V_7_4_0)) { + out.writeBoolean(force); + } } @Override @@ -66,7 +83,7 @@ public class StartDataFrameTransformTaskAction extends ActionType { - finishAndSetState(); - onFailure(e); - })); + }, + this::finishWithFailure)); }); logger.debug("Beginning to index [" + getJobId() + "], state: [" + currentState + "]"); return true; @@ -250,8 +248,9 @@ public abstract class AsyncTwoPhaseIndexer onFailure(exc)); + onFailure(exc); + doSaveState(finishAndSetState(), position.get(), () -> {}); } private void finishWithIndexingFailure(Exception exc) { stats.incrementIndexingFailures(); - doSaveState(finishAndSetState(), position.get(), () -> onFailure(exc)); + onFailure(exc); + doSaveState(finishAndSetState(), position.get(), () -> {}); + } + + private void finishWithFailure(Exception exc) { + onFailure(exc); + finishAndSetState(); } private IndexerState finishAndSetState() { @@ -390,8 +396,7 @@ public abstract class AsyncTwoPhaseIndexer listener = ActionListener.wrap(this::onSearchResponse, this::finishWithSearchFailure); nextSearch(listener); } catch (Exception e) { - finishAndSetState(); - onFailure(e); + finishWithFailure(e); } } } catch (Exception e) { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/DataFrameMessagesTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/DataFrameMessagesTests.java index fc67dc8ce64..b6284af6c58 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/DataFrameMessagesTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/DataFrameMessagesTests.java @@ -51,7 +51,7 @@ public class DataFrameMessagesTests extends ESTestCase { try { innerAssertSingleMessage(message); } catch (Exception e) { - fail(e.getMessage()); + fail("message: " + message + " failure: " + e.getMessage()); } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/StartDataFrameTransformTaskActionRequestTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/StartDataFrameTransformTaskActionRequestTests.java index 8d3d8e3ac78..b6fb6b94d54 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/StartDataFrameTransformTaskActionRequestTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/StartDataFrameTransformTaskActionRequestTests.java @@ -13,7 +13,7 @@ public class StartDataFrameTransformTaskActionRequestTests extends AbstractWireSerializingTestCase { @Override protected StartDataFrameTransformTaskAction.Request createTestInstance() { - return new StartDataFrameTransformTaskAction.Request(randomAlphaOfLength(4)); + return new StartDataFrameTransformTaskAction.Request(randomAlphaOfLength(4), randomBoolean()); } @Override diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTaskFailedStateIT.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTaskFailedStateIT.java index 445c00e9b37..edd8eb44a9f 100644 --- a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTaskFailedStateIT.java +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTaskFailedStateIT.java @@ -37,9 +37,14 @@ public class DataFrameTaskFailedStateIT extends DataFrameRestTestCase { @Before public void setClusterSettings() throws IOException { // Make sure we never retry on failure to speed up the test + // Set logging level to trace + // see: https://github.com/elastic/elasticsearch/issues/45562 Request addFailureRetrySetting = new Request("PUT", "/_cluster/settings"); addFailureRetrySetting.setJsonEntity( - "{\"persistent\": {\"xpack.data_frame.num_transform_failure_retries\": \"" + 0 + "\"}}"); + "{\"transient\": {\"xpack.data_frame.num_transform_failure_retries\": \"" + 0 + "\"," + + "\"logger.org.elasticsearch.action.bulk\": \"info\"," + // reduces bulk failure spam + "\"logger.org.elasticsearch.xpack.core.indexing.AsyncTwoPhaseIndexer\": \"trace\"," + + "\"logger.org.elasticsearch.xpack.dataframe\": \"trace\"}}"); client().performRequest(addFailureRetrySetting); } @@ -84,7 +89,6 @@ public class DataFrameTaskFailedStateIT extends DataFrameRestTestCase { assertThat(XContentMapValues.extractValue("reason", fullState), is(nullValue())); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/45609") public void testForceStartFailedTransform() throws Exception { String transformId = "test-force-start-failed-transform"; createReviewsIndex(REVIEWS_INDEX_NAME, 10); @@ -100,13 +104,16 @@ public class DataFrameTaskFailedStateIT extends DataFrameRestTestCase { // Verify we have failed for the expected reason assertThat(XContentMapValues.extractValue("reason", fullState), equalTo(failureReason)); + final String expectedFailure = "Unable to start data frame transform [test-force-start-failed-transform] " + + "as it is in a failed state with failure: [" + failureReason + + "]. Use force start to restart data frame transform once error is resolved."; // Verify that we cannot start the transform when the task is in a failed state - ResponseException ex = expectThrows(ResponseException.class, () -> startDataframeTransform(transformId, false)); - assertThat(ex.getResponse().getStatusLine().getStatusCode(), equalTo(RestStatus.CONFLICT.getStatus())); - assertThat(XContentMapValues.extractValue("error.reason", entityAsMap(ex.getResponse())), - equalTo("Unable to start data frame transform [test-force-start-failed-transform] as it is in a failed state with failure: [" + - failureReason + - "]. Use force start to restart data frame transform once error is resolved.")); + assertBusy(() -> { + ResponseException ex = expectThrows(ResponseException.class, () -> startDataframeTransform(transformId, false)); + assertThat(ex.getResponse().getStatusLine().getStatusCode(), equalTo(RestStatus.CONFLICT.getStatus())); + assertThat(XContentMapValues.extractValue("error.reason", entityAsMap(ex.getResponse())), + equalTo(expectedFailure)); + }, 60, TimeUnit.SECONDS); // Correct the failure by deleting the destination index deleteIndex(dataFrameIndex); diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformAction.java index ed88ef95502..6aaf46965ea 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformAction.java @@ -160,7 +160,7 @@ public class TransportStartDataFrameTransformAction extends ClientHelper.executeAsyncWithOrigin(client, ClientHelper.DATA_FRAME_ORIGIN, StartDataFrameTransformTaskAction.INSTANCE, - new StartDataFrameTransformTaskAction.Request(request.getId()), + new StartDataFrameTransformTaskAction.Request(request.getId(), request.isForce()), ActionListener.wrap( r -> listener.onResponse(new StartDataFrameTransformAction.Response(true)), listener::onFailure)); diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformTaskAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformTaskAction.java index f8e3a3f1e85..9587aa1ebbf 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformTaskAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformTaskAction.java @@ -59,7 +59,8 @@ public class TransportStartDataFrameTransformTaskAction extends protected void taskOperation(StartDataFrameTransformTaskAction.Request request, DataFrameTransformTask transformTask, ActionListener listener) { if (transformTask.getTransformId().equals(request.getId())) { - transformTask.start(null, listener); + //TODO fix bug as .start where it was failed could result in a null current checkpoint? + transformTask.start(null, request.isForce(), listener); } else { listener.onFailure(new RuntimeException("ID of data frame transform task [" + transformTask.getTransformId() + "] does not match request's ID [" + request.getId() + "]")); diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java index 9e881c81130..1b51a4ba4c3 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.dataframe.action; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListenerResponseHandler; @@ -19,6 +20,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.logging.LoggerMessageFormat; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.discovery.MasterNotDiscoveredException; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; @@ -30,6 +32,8 @@ import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.action.util.PageParams; import org.elasticsearch.xpack.core.dataframe.DataFrameMessages; import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction; +import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction.Request; +import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction.Response; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState; import org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex; @@ -37,16 +41,15 @@ import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigMa import org.elasticsearch.xpack.dataframe.transforms.DataFrameTransformTask; import java.util.ArrayList; -import java.util.Collection; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import static org.elasticsearch.xpack.core.dataframe.DataFrameMessages.DATA_FRAME_CANNOT_STOP_FAILED_TRANSFORM; -public class TransportStopDataFrameTransformAction extends - TransportTasksAction { +public class TransportStopDataFrameTransformAction extends TransportTasksAction { private static final Logger logger = LogManager.getLogger(TransportStopDataFrameTransformAction.class); @@ -61,8 +64,8 @@ public class TransportStopDataFrameTransformAction extends PersistentTasksService persistentTasksService, DataFrameTransformsConfigManager dataFrameTransformsConfigManager, Client client) { - super(StopDataFrameTransformAction.NAME, clusterService, transportService, actionFilters, StopDataFrameTransformAction.Request::new, - StopDataFrameTransformAction.Response::new, StopDataFrameTransformAction.Response::new, ThreadPool.Names.SAME); + super(StopDataFrameTransformAction.NAME, clusterService, transportService, actionFilters, Request::new, + Response::new, Response::new, ThreadPool.Names.SAME); this.threadPool = threadPool; this.dataFrameTransformsConfigManager = dataFrameTransformsConfigManager; this.persistentTasksService = persistentTasksService; @@ -96,8 +99,7 @@ public class TransportStopDataFrameTransformAction extends } @Override - protected void doExecute(Task task, StopDataFrameTransformAction.Request request, - ActionListener listener) { + protected void doExecute(Task task, Request request, ActionListener listener) { final ClusterState state = clusterService.state(); final DiscoveryNodes nodes = state.nodes(); if (nodes.isLocalNodeElectedMaster() == false) { @@ -106,10 +108,10 @@ public class TransportStopDataFrameTransformAction extends listener.onFailure(new MasterNotDiscoveredException("no known master node")); } else { transportService.sendRequest(nodes.getMasterNode(), actionName, request, - new ActionListenerResponseHandler<>(listener, StopDataFrameTransformAction.Response::new)); + new ActionListenerResponseHandler<>(listener, Response::new)); } } else { - final ActionListener finalListener; + final ActionListener finalListener; if (request.waitForCompletion()) { finalListener = waitForStopListener(request, listener); } else { @@ -131,8 +133,7 @@ public class TransportStopDataFrameTransformAction extends } @Override - protected void taskOperation(StopDataFrameTransformAction.Request request, DataFrameTransformTask transformTask, - ActionListener listener) { + protected void taskOperation(Request request, DataFrameTransformTask transformTask, ActionListener listener) { Set ids = request.getExpandedIds(); if (ids == null) { @@ -141,20 +142,13 @@ public class TransportStopDataFrameTransformAction extends } if (ids.contains(transformTask.getTransformId())) { - // This should not occur as we check that none of the tasks are in a failed state earlier - // Keep this check in here for insurance. - if (transformTask.getState().getTaskState() == DataFrameTransformTaskState.FAILED && request.isForce() == false) { - listener.onFailure( - new ElasticsearchStatusException( - DataFrameMessages.getMessage(DATA_FRAME_CANNOT_STOP_FAILED_TRANSFORM, - request.getId(), - transformTask.getState().getReason()), - RestStatus.CONFLICT)); + try { + transformTask.stop(request.isForce()); + } catch (ElasticsearchException ex) { + listener.onFailure(ex); return; } - - transformTask.stop(); - listener.onResponse(new StopDataFrameTransformAction.Response(Boolean.TRUE)); + listener.onResponse(new Response(Boolean.TRUE)); } else { listener.onFailure(new RuntimeException("ID of data frame indexer task [" + transformTask.getTransformId() + "] does not match request's ID [" + request.getId() + "]")); @@ -162,24 +156,22 @@ public class TransportStopDataFrameTransformAction extends } @Override - protected StopDataFrameTransformAction.Response newResponse(StopDataFrameTransformAction.Request request, - List tasks, List taskOperationFailures, - List failedNodeExceptions) { + protected StopDataFrameTransformAction.Response newResponse(Request request, + List tasks, + List taskOperationFailures, + List failedNodeExceptions) { if (taskOperationFailures.isEmpty() == false || failedNodeExceptions.isEmpty() == false) { - return new StopDataFrameTransformAction.Response(taskOperationFailures, failedNodeExceptions, false); + return new Response(taskOperationFailures, failedNodeExceptions, false); } // if tasks is empty allMatch is 'vacuously satisfied' - boolean allAcknowledged = tasks.stream().allMatch(StopDataFrameTransformAction.Response::isAcknowledged); - return new StopDataFrameTransformAction.Response(allAcknowledged); + return new Response(tasks.stream().allMatch(Response::isAcknowledged)); } - private ActionListener - waitForStopListener(StopDataFrameTransformAction.Request request, - ActionListener listener) { + private ActionListener waitForStopListener(Request request, ActionListener listener) { - ActionListener onStopListener = ActionListener.wrap( + ActionListener onStopListener = ActionListener.wrap( waitResponse -> client.admin() .indices() @@ -198,37 +190,70 @@ public class TransportStopDataFrameTransformAction extends // Wait until the persistent task is stopped // Switch over to Generic threadpool so we don't block the network thread threadPool.generic().execute(() -> - waitForDataFrameStopped(request.getExpandedIds(), request.getTimeout(), onStopListener)); + waitForDataFrameStopped(request.getExpandedIds(), request.getTimeout(), request.isForce(), onStopListener)); }, listener::onFailure ); } - private void waitForDataFrameStopped(Collection persistentTaskIds, TimeValue timeout, - ActionListener listener) { + private void waitForDataFrameStopped(Set persistentTaskIds, + TimeValue timeout, + boolean force, + ActionListener listener) { + // This map is accessed in the predicate and the listener callbacks + final Map exceptions = new ConcurrentHashMap<>(); persistentTasksService.waitForPersistentTasksCondition(persistentTasksCustomMetaData -> { - if (persistentTasksCustomMetaData == null) { return true; } - for (String persistentTaskId : persistentTaskIds) { - if (persistentTasksCustomMetaData.getTask(persistentTaskId) != null) { - return false; + PersistentTasksCustomMetaData.PersistentTask transformsTask = persistentTasksCustomMetaData.getTask(persistentTaskId); + // Either the task has successfully stopped or we have seen that it has failed + if (transformsTask == null || exceptions.containsKey(persistentTaskId)) { + continue; } + + // If force is true, then it should eventually go away, don't add it to the collection of failures. + DataFrameTransformState taskState = (DataFrameTransformState)transformsTask.getState(); + if (force == false && taskState != null && taskState.getTaskState() == DataFrameTransformTaskState.FAILED) { + exceptions.put(persistentTaskId, new ElasticsearchStatusException( + DataFrameMessages.getMessage(DATA_FRAME_CANNOT_STOP_FAILED_TRANSFORM, + persistentTaskId, + taskState.getReason()), + RestStatus.CONFLICT)); + + // If all the tasks are now flagged as failed, do not wait for another ClusterState update. + // Return to the caller as soon as possible + return persistentTasksCustomMetaData.tasks().stream().allMatch(p -> exceptions.containsKey(p.getId())); + } + return false; } return true; + }, timeout, ActionListener.wrap( + r -> { + // No exceptions AND the tasks have gone away + if (exceptions.isEmpty()) { + listener.onResponse(new Response(Boolean.TRUE)); + return; + } + // We are only stopping one task, so if there is a failure, it is the only one + if (persistentTaskIds.size() == 1) { + listener.onFailure(exceptions.get(persistentTaskIds.iterator().next())); + return; + } - }, timeout, new ActionListener() { - @Override - public void onResponse(Boolean result) { - listener.onResponse(new StopDataFrameTransformAction.Response(Boolean.TRUE)); - } + Set stoppedTasks = new HashSet<>(persistentTaskIds); + stoppedTasks.removeAll(exceptions.keySet()); + String message = stoppedTasks.isEmpty() ? + "Could not stop any of the tasks as all were failed. Use force stop to stop the transforms." : + LoggerMessageFormat.format("Successfully stopped [{}] transforms. " + + "Could not stop the transforms {} as they were failed. Use force stop to stop the transforms.", + stoppedTasks.size(), + exceptions.keySet()); - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + listener.onFailure(new ElasticsearchStatusException(message, RestStatus.CONFLICT)); + }, + listener::onFailure + )); } } diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java index a75c2d4b022..78650c5baac 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java @@ -299,7 +299,7 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx Long previousCheckpoint, ActionListener listener) { buildTask.initializeIndexer(indexerBuilder); - buildTask.setNumFailureRetries(numFailureRetries).start(previousCheckpoint, listener); + buildTask.setNumFailureRetries(numFailureRetries).start(previousCheckpoint, false, listener); } private void setNumFailureRetries(int numFailureRetries) { diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java index 974566a491a..fab87569fc5 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java @@ -10,6 +10,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.lucene.util.SetOnce; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.bulk.BulkAction; @@ -26,6 +27,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.persistent.AllocatedPersistentTask; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ClientHelper; @@ -60,6 +62,9 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import static org.elasticsearch.xpack.core.dataframe.DataFrameMessages.DATA_FRAME_CANNOT_START_FAILED_TRANSFORM; +import static org.elasticsearch.xpack.core.dataframe.DataFrameMessages.DATA_FRAME_CANNOT_STOP_FAILED_TRANSFORM; + public class DataFrameTransformTask extends AllocatedPersistentTask implements SchedulerEngine.Listener { @@ -240,7 +245,16 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S * current checkpoint is not set * @param listener Started listener */ - public synchronized void start(Long startingCheckpoint, ActionListener listener) { + public synchronized void start(Long startingCheckpoint, boolean force, ActionListener listener) { + logger.debug("[{}] start called with force [{}] and state [{}]", getTransformId(), force, getState()); + if (taskState.get() == DataFrameTransformTaskState.FAILED && force == false) { + listener.onFailure(new ElasticsearchStatusException( + DataFrameMessages.getMessage(DATA_FRAME_CANNOT_START_FAILED_TRANSFORM, + getTransformId(), + stateReason.get()), + RestStatus.CONFLICT)); + return; + } if (getIndexer() == null) { listener.onFailure(new ElasticsearchException("Task for transform [{}] not fully initialized. Try again later", getTransformId())); @@ -289,7 +303,8 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S )); } - public synchronized void stop() { + public synchronized void stop(boolean force) { + logger.debug("[{}] stop called with force [{}] and state [{}]", getTransformId(), force, getState()); if (getIndexer() == null) { // If there is no indexer the task has not been triggered // but it still needs to be stopped and removed @@ -301,8 +316,20 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S return; } + if (taskState.get() == DataFrameTransformTaskState.FAILED && force == false) { + throw new ElasticsearchStatusException( + DataFrameMessages.getMessage(DATA_FRAME_CANNOT_STOP_FAILED_TRANSFORM, + getTransformId(), + stateReason.get()), + RestStatus.CONFLICT); + } + IndexerState state = getIndexer().stop(); stateReason.set(null); + // We just don't want it to be failed if it is failed + // Either we are running, and the STATE is already started or failed + // doSaveState should transfer the state to STOPPED when it needs to. + taskState.set(DataFrameTransformTaskState.STARTED); if (state == IndexerState.STOPPED) { getIndexer().onStop(); getIndexer().doSaveState(state, getIndexer().getPosition(), () -> {}); @@ -311,13 +338,13 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S @Override public synchronized void triggered(Event event) { - if (getIndexer() == null) { - logger.warn("Data frame task [{}] triggered with an unintialized indexer", getTransformId()); + // Ignore if event is not for this job + if (event.getJobName().equals(schedulerJobName()) == false) { return; } - // Ignore if event is not for this job - if (event.getJobName().equals(schedulerJobName()) == false) { + if (getIndexer() == null) { + logger.warn("Data frame task [{}] triggered with an unintialized indexer", getTransformId()); return; } @@ -388,6 +415,21 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S if (getIndexer() != null && getIndexer().getState() == IndexerState.STOPPING) { logger.info("Attempt to fail transform [" + getTransformId() + "] with reason [" + reason + "] while it was stopping."); auditor.info(getTransformId(), "Attempted to fail transform with reason [" + reason + "] while in STOPPING state."); + listener.onResponse(null); + return; + } + // If we are stopped, this means that between the failure occurring and being handled, somebody called stop + // We should just allow that stop to continue + if (getIndexer() != null && getIndexer().getState() == IndexerState.STOPPED) { + logger.info("[{}] encountered a failure but indexer is STOPPED; reason [{}]", getTransformId(), reason); + listener.onResponse(null); + return; + } + // If we are already flagged as failed, this probably means that a second trigger started firing while we were attempting to + // flag the previously triggered indexer as failed. Exit early as we are already flagged as failed. + if (taskState.get() == DataFrameTransformTaskState.FAILED) { + logger.warn("[{}] is already failed but encountered new failure; reason [{}] ", getTransformId(), reason); + listener.onResponse(null); return; } auditor.error(transform.getId(), reason); @@ -396,25 +438,21 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S deregisterSchedulerJob(); DataFrameTransformState newState = new DataFrameTransformState( DataFrameTransformTaskState.FAILED, - initialIndexerState, - initialPosition, + getIndexer() == null ? initialIndexerState : getIndexer().getState(), + getIndexer() == null ? initialPosition : getIndexer().getPosition(), currentCheckpoint.get(), reason, getIndexer() == null ? null : getIndexer().getProgress()); + taskState.set(DataFrameTransformTaskState.FAILED); + stateReason.set(reason); // Even though the indexer information is persisted to an index, we still need DataFrameTransformTaskState in the clusterstate // This keeps track of STARTED, FAILED, STOPPED - // This is because a FAILED state can occur because we cannot read the config from the internal index, which would imply that + // This is because a FAILED state could occur because we failed to read the config from the internal index, which would imply that // we could not read the previous state information from said index. persistStateToClusterState(newState, ActionListener.wrap( - r -> { - taskState.set(DataFrameTransformTaskState.FAILED); - stateReason.set(reason); - listener.onResponse(null); - }, + r -> listener.onResponse(null), e -> { logger.error("Failed to set task state as failed to cluster state", e); - taskState.set(DataFrameTransformTaskState.FAILED); - stateReason.set(reason); listener.onFailure(e); } )); @@ -630,6 +668,11 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S @Override protected void onStart(long now, ActionListener listener) { + if (transformTask.taskState.get() == DataFrameTransformTaskState.FAILED) { + logger.debug("[{}] attempted to start while failed.", transformId); + listener.onFailure(new ElasticsearchException("Attempted to start a failed transform [{}].", transformId)); + return; + } // On each run, we need to get the total number of docs and reset the count of processed docs // Since multiple checkpoints can be executed in the task while it is running on the same node, we need to gather // the progress here, and not in the executor. @@ -746,12 +789,24 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S @Override protected void doNextSearch(SearchRequest request, ActionListener nextPhase) { + if (transformTask.taskState.get() == DataFrameTransformTaskState.FAILED) { + logger.debug("[{}] attempted to search while failed.", transformId); + nextPhase.onFailure(new ElasticsearchException("Attempted to do a search request for failed transform [{}].", + transformId)); + return; + } ClientHelper.executeWithHeadersAsync(transformConfig.getHeaders(), ClientHelper.DATA_FRAME_ORIGIN, client, SearchAction.INSTANCE, request, nextPhase); } @Override protected void doNextBulk(BulkRequest request, ActionListener nextPhase) { + if (transformTask.taskState.get() == DataFrameTransformTaskState.FAILED) { + logger.debug("[{}] attempted to bulk index while failed.", transformId); + nextPhase.onFailure(new ElasticsearchException("Attempted to do a bulk index request for failed transform [{}].", + transformId)); + return; + } ClientHelper.executeWithHeadersAsync(transformConfig.getHeaders(), ClientHelper.DATA_FRAME_ORIGIN, client, @@ -788,6 +843,12 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S @Override protected void doSaveState(IndexerState indexerState, DataFrameIndexerPosition position, Runnable next) { + if (transformTask.taskState.get() == DataFrameTransformTaskState.FAILED) { + logger.debug("[{}] attempted to save state and stats while failed.", transformId); + // If we are failed, we should call next to allow failure handling to occur if necessary. + next.run(); + return; + } if (indexerState.equals(IndexerState.ABORTING)) { // If we're aborting, just invoke `next` (which is likely an onFailure handler) next.run(); @@ -831,7 +892,6 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S r -> { // for auto stop shutdown the task if (state.getTaskState().equals(DataFrameTransformTaskState.STOPPED)) { - onStop(); transformTask.shutdown(); } next.run(); @@ -853,8 +913,6 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S protected void onFailure(Exception exc) { // the failure handler must not throw an exception due to internal problems try { - logger.warn("Data frame transform [" + transformTask.getTransformId() + "] encountered an exception: ", exc); - // Since our schedule fires again very quickly after failures it is possible to run into the same failure numerous // times in a row, very quickly. We do not want to spam the audit log with repeated failures, so only record the first one if (exc.getMessage().equals(lastAuditedExceptionMessage) == false) { @@ -989,6 +1047,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S } synchronized void handleFailure(Exception e) { + logger.warn("Data frame transform [" + transformTask.getTransformId() + "] encountered an exception: ", e); if (handleCircuitBreakingException(e)) { return; } @@ -1003,7 +1062,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S @Override protected void failIndexer(String failureMessage) { - logger.error("Data frame transform [" + getJobId() + "]:" + failureMessage); + logger.error("Data frame transform [" + getJobId() + "]: " + failureMessage); auditor.error(transformTask.getTransformId(), failureMessage); transformTask.markAsFailed(failureMessage, ActionListener.wrap( r -> { diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerStateTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerStateTests.java index 62642cdd10b..468d8ce7abf 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerStateTests.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerStateTests.java @@ -40,6 +40,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; @@ -174,15 +175,24 @@ public class RollupIndexerStateTests extends ESTestCase { final Function searchFunction; final Function bulkFunction; final Consumer failureConsumer; + final BiConsumer> saveStateCheck; private CountDownLatch latch; NonEmptyRollupIndexer(Executor executor, RollupJob job, AtomicReference initialState, Map initialPosition, Function searchFunction, Function bulkFunction, Consumer failureConsumer) { + this(executor, job, initialState, initialPosition, searchFunction, bulkFunction, failureConsumer, (i, m) -> {}); + } + + NonEmptyRollupIndexer(Executor executor, RollupJob job, AtomicReference initialState, + Map initialPosition, Function searchFunction, + Function bulkFunction, Consumer failureConsumer, + BiConsumer> saveStateCheck) { super(executor, job, initialState, initialPosition, new AtomicBoolean(randomBoolean())); this.searchFunction = searchFunction; this.bulkFunction = bulkFunction; this.failureConsumer = failureConsumer; + this.saveStateCheck = saveStateCheck; } private CountDownLatch newLatch(int count) { @@ -220,6 +230,7 @@ public class RollupIndexerStateTests extends ESTestCase { @Override protected void doSaveState(IndexerState state, Map position, Runnable next) { assert state == IndexerState.STARTED || state == IndexerState.INDEXING || state == IndexerState.STOPPED; + saveStateCheck.accept(state, position); next.run(); } @@ -770,6 +781,9 @@ public class RollupIndexerStateTests extends ESTestCase { Consumer failureConsumer = e -> { assertThat(e.getMessage(), equalTo("Could not identify key in agg [foo]")); + }; + + BiConsumer> doSaveStateCheck = (indexerState, position) -> { isFinished.set(true); }; @@ -777,7 +791,7 @@ public class RollupIndexerStateTests extends ESTestCase { try { NonEmptyRollupIndexer indexer = new NonEmptyRollupIndexer(executor, job, state, null, - searchFunction, bulkFunction, failureConsumer); + searchFunction, bulkFunction, failureConsumer, doSaveStateCheck); final CountDownLatch latch = indexer.newLatch(1); indexer.start(); assertThat(indexer.getState(), equalTo(IndexerState.STARTED));