diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameMessages.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameMessages.java index 45e923de231..32f639a1fac 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameMessages.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameMessages.java @@ -34,6 +34,10 @@ public class DataFrameMessages { public static final String DATA_FRAME_CANNOT_STOP_FAILED_TRANSFORM = "Unable to stop data frame transform [{0}] as it is in a failed state with reason [{1}]." + " Use force stop to stop the data frame transform."; + public static final String DATA_FRAME_CANNOT_START_FAILED_TRANSFORM = + "Unable to start data frame transform [{0}] as it is in a failed state with failure: [{1}]. " + + "Use force start to restart data frame transform once error is resolved."; + public static final String FAILED_TO_CREATE_DESTINATION_INDEX = "Could not create destination index [{0}] for transform [{1}]"; public static final String FAILED_TO_RELOAD_TRANSFORM_CONFIGURATION = "Failed to reload data frame transform configuration for transform [{0}]"; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/StartDataFrameTransformTaskAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/StartDataFrameTransformTaskAction.java index e1ebe4eb0ab..4fe87d9727f 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/StartDataFrameTransformTaskAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/StartDataFrameTransformTaskAction.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.core.dataframe.action; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.support.tasks.BaseTasksRequest; @@ -34,24 +35,40 @@ public class StartDataFrameTransformTaskAction extends ActionType { private final String id; + private final boolean force; - public Request(String id) { + public Request(String id, boolean force) { this.id = ExceptionsHelper.requireNonNull(id, DataFrameField.ID.getPreferredName()); + this.force = force; } public Request(StreamInput in) throws IOException { super(in); id = in.readString(); + if (in.getVersion().onOrAfter(Version.V_7_4_0)) { + force = in.readBoolean(); + } else { + // The behavior before V_7_4_0 was that this flag did not exist, + // assuming previous checks allowed this task to be started. + force = true; + } } public String getId() { return id; } + public boolean isForce() { + return force; + } + @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeString(id); + if (out.getVersion().onOrAfter(Version.V_7_4_0)) { + out.writeBoolean(force); + } } @Override @@ -66,7 +83,7 @@ public class StartDataFrameTransformTaskAction extends ActionType { - finishAndSetState(); - onFailure(e); - })); + }, + this::finishWithFailure)); }); logger.debug("Beginning to index [" + getJobId() + "], state: [" + currentState + "]"); return true; @@ -250,8 +248,9 @@ public abstract class AsyncTwoPhaseIndexer onFailure(exc)); + onFailure(exc); + doSaveState(finishAndSetState(), position.get(), () -> {}); } private void finishWithIndexingFailure(Exception exc) { stats.incrementIndexingFailures(); - doSaveState(finishAndSetState(), position.get(), () -> onFailure(exc)); + onFailure(exc); + doSaveState(finishAndSetState(), position.get(), () -> {}); + } + + private void finishWithFailure(Exception exc) { + onFailure(exc); + finishAndSetState(); } private IndexerState finishAndSetState() { @@ -390,8 +396,7 @@ public abstract class AsyncTwoPhaseIndexer listener = ActionListener.wrap(this::onSearchResponse, this::finishWithSearchFailure); nextSearch(listener); } catch (Exception e) { - finishAndSetState(); - onFailure(e); + finishWithFailure(e); } } } catch (Exception e) { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/DataFrameMessagesTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/DataFrameMessagesTests.java index fc67dc8ce64..b6284af6c58 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/DataFrameMessagesTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/DataFrameMessagesTests.java @@ -51,7 +51,7 @@ public class DataFrameMessagesTests extends ESTestCase { try { innerAssertSingleMessage(message); } catch (Exception e) { - fail(e.getMessage()); + fail("message: " + message + " failure: " + e.getMessage()); } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/StartDataFrameTransformTaskActionRequestTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/StartDataFrameTransformTaskActionRequestTests.java index 8d3d8e3ac78..b6fb6b94d54 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/StartDataFrameTransformTaskActionRequestTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/StartDataFrameTransformTaskActionRequestTests.java @@ -13,7 +13,7 @@ public class StartDataFrameTransformTaskActionRequestTests extends AbstractWireSerializingTestCase { @Override protected StartDataFrameTransformTaskAction.Request createTestInstance() { - return new StartDataFrameTransformTaskAction.Request(randomAlphaOfLength(4)); + return new StartDataFrameTransformTaskAction.Request(randomAlphaOfLength(4), randomBoolean()); } @Override diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTaskFailedStateIT.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTaskFailedStateIT.java index 445c00e9b37..edd8eb44a9f 100644 --- a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTaskFailedStateIT.java +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTaskFailedStateIT.java @@ -37,9 +37,14 @@ public class DataFrameTaskFailedStateIT extends DataFrameRestTestCase { @Before public void setClusterSettings() throws IOException { // Make sure we never retry on failure to speed up the test + // Set logging level to trace + // see: https://github.com/elastic/elasticsearch/issues/45562 Request addFailureRetrySetting = new Request("PUT", "/_cluster/settings"); addFailureRetrySetting.setJsonEntity( - "{\"persistent\": {\"xpack.data_frame.num_transform_failure_retries\": \"" + 0 + "\"}}"); + "{\"transient\": {\"xpack.data_frame.num_transform_failure_retries\": \"" + 0 + "\"," + + "\"logger.org.elasticsearch.action.bulk\": \"info\"," + // reduces bulk failure spam + "\"logger.org.elasticsearch.xpack.core.indexing.AsyncTwoPhaseIndexer\": \"trace\"," + + "\"logger.org.elasticsearch.xpack.dataframe\": \"trace\"}}"); client().performRequest(addFailureRetrySetting); } @@ -84,7 +89,6 @@ public class DataFrameTaskFailedStateIT extends DataFrameRestTestCase { assertThat(XContentMapValues.extractValue("reason", fullState), is(nullValue())); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/45609") public void testForceStartFailedTransform() throws Exception { String transformId = "test-force-start-failed-transform"; createReviewsIndex(REVIEWS_INDEX_NAME, 10); @@ -100,13 +104,16 @@ public class DataFrameTaskFailedStateIT extends DataFrameRestTestCase { // Verify we have failed for the expected reason assertThat(XContentMapValues.extractValue("reason", fullState), equalTo(failureReason)); + final String expectedFailure = "Unable to start data frame transform [test-force-start-failed-transform] " + + "as it is in a failed state with failure: [" + failureReason + + "]. Use force start to restart data frame transform once error is resolved."; // Verify that we cannot start the transform when the task is in a failed state - ResponseException ex = expectThrows(ResponseException.class, () -> startDataframeTransform(transformId, false)); - assertThat(ex.getResponse().getStatusLine().getStatusCode(), equalTo(RestStatus.CONFLICT.getStatus())); - assertThat(XContentMapValues.extractValue("error.reason", entityAsMap(ex.getResponse())), - equalTo("Unable to start data frame transform [test-force-start-failed-transform] as it is in a failed state with failure: [" + - failureReason + - "]. Use force start to restart data frame transform once error is resolved.")); + assertBusy(() -> { + ResponseException ex = expectThrows(ResponseException.class, () -> startDataframeTransform(transformId, false)); + assertThat(ex.getResponse().getStatusLine().getStatusCode(), equalTo(RestStatus.CONFLICT.getStatus())); + assertThat(XContentMapValues.extractValue("error.reason", entityAsMap(ex.getResponse())), + equalTo(expectedFailure)); + }, 60, TimeUnit.SECONDS); // Correct the failure by deleting the destination index deleteIndex(dataFrameIndex); diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformAction.java index ed88ef95502..6aaf46965ea 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformAction.java @@ -160,7 +160,7 @@ public class TransportStartDataFrameTransformAction extends ClientHelper.executeAsyncWithOrigin(client, ClientHelper.DATA_FRAME_ORIGIN, StartDataFrameTransformTaskAction.INSTANCE, - new StartDataFrameTransformTaskAction.Request(request.getId()), + new StartDataFrameTransformTaskAction.Request(request.getId(), request.isForce()), ActionListener.wrap( r -> listener.onResponse(new StartDataFrameTransformAction.Response(true)), listener::onFailure)); diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformTaskAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformTaskAction.java index f8e3a3f1e85..9587aa1ebbf 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformTaskAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformTaskAction.java @@ -59,7 +59,8 @@ public class TransportStartDataFrameTransformTaskAction extends protected void taskOperation(StartDataFrameTransformTaskAction.Request request, DataFrameTransformTask transformTask, ActionListener listener) { if (transformTask.getTransformId().equals(request.getId())) { - transformTask.start(null, listener); + //TODO fix bug as .start where it was failed could result in a null current checkpoint? + transformTask.start(null, request.isForce(), listener); } else { listener.onFailure(new RuntimeException("ID of data frame transform task [" + transformTask.getTransformId() + "] does not match request's ID [" + request.getId() + "]")); diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java index 9e881c81130..1b51a4ba4c3 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.dataframe.action; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListenerResponseHandler; @@ -19,6 +20,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.logging.LoggerMessageFormat; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.discovery.MasterNotDiscoveredException; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; @@ -30,6 +32,8 @@ import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.action.util.PageParams; import org.elasticsearch.xpack.core.dataframe.DataFrameMessages; import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction; +import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction.Request; +import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction.Response; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState; import org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex; @@ -37,16 +41,15 @@ import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigMa import org.elasticsearch.xpack.dataframe.transforms.DataFrameTransformTask; import java.util.ArrayList; -import java.util.Collection; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import static org.elasticsearch.xpack.core.dataframe.DataFrameMessages.DATA_FRAME_CANNOT_STOP_FAILED_TRANSFORM; -public class TransportStopDataFrameTransformAction extends - TransportTasksAction { +public class TransportStopDataFrameTransformAction extends TransportTasksAction { private static final Logger logger = LogManager.getLogger(TransportStopDataFrameTransformAction.class); @@ -61,8 +64,8 @@ public class TransportStopDataFrameTransformAction extends PersistentTasksService persistentTasksService, DataFrameTransformsConfigManager dataFrameTransformsConfigManager, Client client) { - super(StopDataFrameTransformAction.NAME, clusterService, transportService, actionFilters, StopDataFrameTransformAction.Request::new, - StopDataFrameTransformAction.Response::new, StopDataFrameTransformAction.Response::new, ThreadPool.Names.SAME); + super(StopDataFrameTransformAction.NAME, clusterService, transportService, actionFilters, Request::new, + Response::new, Response::new, ThreadPool.Names.SAME); this.threadPool = threadPool; this.dataFrameTransformsConfigManager = dataFrameTransformsConfigManager; this.persistentTasksService = persistentTasksService; @@ -96,8 +99,7 @@ public class TransportStopDataFrameTransformAction extends } @Override - protected void doExecute(Task task, StopDataFrameTransformAction.Request request, - ActionListener listener) { + protected void doExecute(Task task, Request request, ActionListener listener) { final ClusterState state = clusterService.state(); final DiscoveryNodes nodes = state.nodes(); if (nodes.isLocalNodeElectedMaster() == false) { @@ -106,10 +108,10 @@ public class TransportStopDataFrameTransformAction extends listener.onFailure(new MasterNotDiscoveredException("no known master node")); } else { transportService.sendRequest(nodes.getMasterNode(), actionName, request, - new ActionListenerResponseHandler<>(listener, StopDataFrameTransformAction.Response::new)); + new ActionListenerResponseHandler<>(listener, Response::new)); } } else { - final ActionListener finalListener; + final ActionListener finalListener; if (request.waitForCompletion()) { finalListener = waitForStopListener(request, listener); } else { @@ -131,8 +133,7 @@ public class TransportStopDataFrameTransformAction extends } @Override - protected void taskOperation(StopDataFrameTransformAction.Request request, DataFrameTransformTask transformTask, - ActionListener listener) { + protected void taskOperation(Request request, DataFrameTransformTask transformTask, ActionListener listener) { Set ids = request.getExpandedIds(); if (ids == null) { @@ -141,20 +142,13 @@ public class TransportStopDataFrameTransformAction extends } if (ids.contains(transformTask.getTransformId())) { - // This should not occur as we check that none of the tasks are in a failed state earlier - // Keep this check in here for insurance. - if (transformTask.getState().getTaskState() == DataFrameTransformTaskState.FAILED && request.isForce() == false) { - listener.onFailure( - new ElasticsearchStatusException( - DataFrameMessages.getMessage(DATA_FRAME_CANNOT_STOP_FAILED_TRANSFORM, - request.getId(), - transformTask.getState().getReason()), - RestStatus.CONFLICT)); + try { + transformTask.stop(request.isForce()); + } catch (ElasticsearchException ex) { + listener.onFailure(ex); return; } - - transformTask.stop(); - listener.onResponse(new StopDataFrameTransformAction.Response(Boolean.TRUE)); + listener.onResponse(new Response(Boolean.TRUE)); } else { listener.onFailure(new RuntimeException("ID of data frame indexer task [" + transformTask.getTransformId() + "] does not match request's ID [" + request.getId() + "]")); @@ -162,24 +156,22 @@ public class TransportStopDataFrameTransformAction extends } @Override - protected StopDataFrameTransformAction.Response newResponse(StopDataFrameTransformAction.Request request, - List tasks, List taskOperationFailures, - List failedNodeExceptions) { + protected StopDataFrameTransformAction.Response newResponse(Request request, + List tasks, + List taskOperationFailures, + List failedNodeExceptions) { if (taskOperationFailures.isEmpty() == false || failedNodeExceptions.isEmpty() == false) { - return new StopDataFrameTransformAction.Response(taskOperationFailures, failedNodeExceptions, false); + return new Response(taskOperationFailures, failedNodeExceptions, false); } // if tasks is empty allMatch is 'vacuously satisfied' - boolean allAcknowledged = tasks.stream().allMatch(StopDataFrameTransformAction.Response::isAcknowledged); - return new StopDataFrameTransformAction.Response(allAcknowledged); + return new Response(tasks.stream().allMatch(Response::isAcknowledged)); } - private ActionListener - waitForStopListener(StopDataFrameTransformAction.Request request, - ActionListener listener) { + private ActionListener waitForStopListener(Request request, ActionListener listener) { - ActionListener onStopListener = ActionListener.wrap( + ActionListener onStopListener = ActionListener.wrap( waitResponse -> client.admin() .indices() @@ -198,37 +190,70 @@ public class TransportStopDataFrameTransformAction extends // Wait until the persistent task is stopped // Switch over to Generic threadpool so we don't block the network thread threadPool.generic().execute(() -> - waitForDataFrameStopped(request.getExpandedIds(), request.getTimeout(), onStopListener)); + waitForDataFrameStopped(request.getExpandedIds(), request.getTimeout(), request.isForce(), onStopListener)); }, listener::onFailure ); } - private void waitForDataFrameStopped(Collection persistentTaskIds, TimeValue timeout, - ActionListener listener) { + private void waitForDataFrameStopped(Set persistentTaskIds, + TimeValue timeout, + boolean force, + ActionListener listener) { + // This map is accessed in the predicate and the listener callbacks + final Map exceptions = new ConcurrentHashMap<>(); persistentTasksService.waitForPersistentTasksCondition(persistentTasksCustomMetaData -> { - if (persistentTasksCustomMetaData == null) { return true; } - for (String persistentTaskId : persistentTaskIds) { - if (persistentTasksCustomMetaData.getTask(persistentTaskId) != null) { - return false; + PersistentTasksCustomMetaData.PersistentTask transformsTask = persistentTasksCustomMetaData.getTask(persistentTaskId); + // Either the task has successfully stopped or we have seen that it has failed + if (transformsTask == null || exceptions.containsKey(persistentTaskId)) { + continue; } + + // If force is true, then it should eventually go away, don't add it to the collection of failures. + DataFrameTransformState taskState = (DataFrameTransformState)transformsTask.getState(); + if (force == false && taskState != null && taskState.getTaskState() == DataFrameTransformTaskState.FAILED) { + exceptions.put(persistentTaskId, new ElasticsearchStatusException( + DataFrameMessages.getMessage(DATA_FRAME_CANNOT_STOP_FAILED_TRANSFORM, + persistentTaskId, + taskState.getReason()), + RestStatus.CONFLICT)); + + // If all the tasks are now flagged as failed, do not wait for another ClusterState update. + // Return to the caller as soon as possible + return persistentTasksCustomMetaData.tasks().stream().allMatch(p -> exceptions.containsKey(p.getId())); + } + return false; } return true; + }, timeout, ActionListener.wrap( + r -> { + // No exceptions AND the tasks have gone away + if (exceptions.isEmpty()) { + listener.onResponse(new Response(Boolean.TRUE)); + return; + } + // We are only stopping one task, so if there is a failure, it is the only one + if (persistentTaskIds.size() == 1) { + listener.onFailure(exceptions.get(persistentTaskIds.iterator().next())); + return; + } - }, timeout, new ActionListener() { - @Override - public void onResponse(Boolean result) { - listener.onResponse(new StopDataFrameTransformAction.Response(Boolean.TRUE)); - } + Set stoppedTasks = new HashSet<>(persistentTaskIds); + stoppedTasks.removeAll(exceptions.keySet()); + String message = stoppedTasks.isEmpty() ? + "Could not stop any of the tasks as all were failed. Use force stop to stop the transforms." : + LoggerMessageFormat.format("Successfully stopped [{}] transforms. " + + "Could not stop the transforms {} as they were failed. Use force stop to stop the transforms.", + stoppedTasks.size(), + exceptions.keySet()); - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + listener.onFailure(new ElasticsearchStatusException(message, RestStatus.CONFLICT)); + }, + listener::onFailure + )); } } diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java index a75c2d4b022..78650c5baac 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java @@ -299,7 +299,7 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx Long previousCheckpoint, ActionListener listener) { buildTask.initializeIndexer(indexerBuilder); - buildTask.setNumFailureRetries(numFailureRetries).start(previousCheckpoint, listener); + buildTask.setNumFailureRetries(numFailureRetries).start(previousCheckpoint, false, listener); } private void setNumFailureRetries(int numFailureRetries) { diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java index 974566a491a..fab87569fc5 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java @@ -10,6 +10,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.lucene.util.SetOnce; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.bulk.BulkAction; @@ -26,6 +27,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.persistent.AllocatedPersistentTask; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ClientHelper; @@ -60,6 +62,9 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import static org.elasticsearch.xpack.core.dataframe.DataFrameMessages.DATA_FRAME_CANNOT_START_FAILED_TRANSFORM; +import static org.elasticsearch.xpack.core.dataframe.DataFrameMessages.DATA_FRAME_CANNOT_STOP_FAILED_TRANSFORM; + public class DataFrameTransformTask extends AllocatedPersistentTask implements SchedulerEngine.Listener { @@ -240,7 +245,16 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S * current checkpoint is not set * @param listener Started listener */ - public synchronized void start(Long startingCheckpoint, ActionListener listener) { + public synchronized void start(Long startingCheckpoint, boolean force, ActionListener listener) { + logger.debug("[{}] start called with force [{}] and state [{}]", getTransformId(), force, getState()); + if (taskState.get() == DataFrameTransformTaskState.FAILED && force == false) { + listener.onFailure(new ElasticsearchStatusException( + DataFrameMessages.getMessage(DATA_FRAME_CANNOT_START_FAILED_TRANSFORM, + getTransformId(), + stateReason.get()), + RestStatus.CONFLICT)); + return; + } if (getIndexer() == null) { listener.onFailure(new ElasticsearchException("Task for transform [{}] not fully initialized. Try again later", getTransformId())); @@ -289,7 +303,8 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S )); } - public synchronized void stop() { + public synchronized void stop(boolean force) { + logger.debug("[{}] stop called with force [{}] and state [{}]", getTransformId(), force, getState()); if (getIndexer() == null) { // If there is no indexer the task has not been triggered // but it still needs to be stopped and removed @@ -301,8 +316,20 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S return; } + if (taskState.get() == DataFrameTransformTaskState.FAILED && force == false) { + throw new ElasticsearchStatusException( + DataFrameMessages.getMessage(DATA_FRAME_CANNOT_STOP_FAILED_TRANSFORM, + getTransformId(), + stateReason.get()), + RestStatus.CONFLICT); + } + IndexerState state = getIndexer().stop(); stateReason.set(null); + // We just don't want it to be failed if it is failed + // Either we are running, and the STATE is already started or failed + // doSaveState should transfer the state to STOPPED when it needs to. + taskState.set(DataFrameTransformTaskState.STARTED); if (state == IndexerState.STOPPED) { getIndexer().onStop(); getIndexer().doSaveState(state, getIndexer().getPosition(), () -> {}); @@ -311,13 +338,13 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S @Override public synchronized void triggered(Event event) { - if (getIndexer() == null) { - logger.warn("Data frame task [{}] triggered with an unintialized indexer", getTransformId()); + // Ignore if event is not for this job + if (event.getJobName().equals(schedulerJobName()) == false) { return; } - // Ignore if event is not for this job - if (event.getJobName().equals(schedulerJobName()) == false) { + if (getIndexer() == null) { + logger.warn("Data frame task [{}] triggered with an unintialized indexer", getTransformId()); return; } @@ -388,6 +415,21 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S if (getIndexer() != null && getIndexer().getState() == IndexerState.STOPPING) { logger.info("Attempt to fail transform [" + getTransformId() + "] with reason [" + reason + "] while it was stopping."); auditor.info(getTransformId(), "Attempted to fail transform with reason [" + reason + "] while in STOPPING state."); + listener.onResponse(null); + return; + } + // If we are stopped, this means that between the failure occurring and being handled, somebody called stop + // We should just allow that stop to continue + if (getIndexer() != null && getIndexer().getState() == IndexerState.STOPPED) { + logger.info("[{}] encountered a failure but indexer is STOPPED; reason [{}]", getTransformId(), reason); + listener.onResponse(null); + return; + } + // If we are already flagged as failed, this probably means that a second trigger started firing while we were attempting to + // flag the previously triggered indexer as failed. Exit early as we are already flagged as failed. + if (taskState.get() == DataFrameTransformTaskState.FAILED) { + logger.warn("[{}] is already failed but encountered new failure; reason [{}] ", getTransformId(), reason); + listener.onResponse(null); return; } auditor.error(transform.getId(), reason); @@ -396,25 +438,21 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S deregisterSchedulerJob(); DataFrameTransformState newState = new DataFrameTransformState( DataFrameTransformTaskState.FAILED, - initialIndexerState, - initialPosition, + getIndexer() == null ? initialIndexerState : getIndexer().getState(), + getIndexer() == null ? initialPosition : getIndexer().getPosition(), currentCheckpoint.get(), reason, getIndexer() == null ? null : getIndexer().getProgress()); + taskState.set(DataFrameTransformTaskState.FAILED); + stateReason.set(reason); // Even though the indexer information is persisted to an index, we still need DataFrameTransformTaskState in the clusterstate // This keeps track of STARTED, FAILED, STOPPED - // This is because a FAILED state can occur because we cannot read the config from the internal index, which would imply that + // This is because a FAILED state could occur because we failed to read the config from the internal index, which would imply that // we could not read the previous state information from said index. persistStateToClusterState(newState, ActionListener.wrap( - r -> { - taskState.set(DataFrameTransformTaskState.FAILED); - stateReason.set(reason); - listener.onResponse(null); - }, + r -> listener.onResponse(null), e -> { logger.error("Failed to set task state as failed to cluster state", e); - taskState.set(DataFrameTransformTaskState.FAILED); - stateReason.set(reason); listener.onFailure(e); } )); @@ -630,6 +668,11 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S @Override protected void onStart(long now, ActionListener listener) { + if (transformTask.taskState.get() == DataFrameTransformTaskState.FAILED) { + logger.debug("[{}] attempted to start while failed.", transformId); + listener.onFailure(new ElasticsearchException("Attempted to start a failed transform [{}].", transformId)); + return; + } // On each run, we need to get the total number of docs and reset the count of processed docs // Since multiple checkpoints can be executed in the task while it is running on the same node, we need to gather // the progress here, and not in the executor. @@ -746,12 +789,24 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S @Override protected void doNextSearch(SearchRequest request, ActionListener nextPhase) { + if (transformTask.taskState.get() == DataFrameTransformTaskState.FAILED) { + logger.debug("[{}] attempted to search while failed.", transformId); + nextPhase.onFailure(new ElasticsearchException("Attempted to do a search request for failed transform [{}].", + transformId)); + return; + } ClientHelper.executeWithHeadersAsync(transformConfig.getHeaders(), ClientHelper.DATA_FRAME_ORIGIN, client, SearchAction.INSTANCE, request, nextPhase); } @Override protected void doNextBulk(BulkRequest request, ActionListener nextPhase) { + if (transformTask.taskState.get() == DataFrameTransformTaskState.FAILED) { + logger.debug("[{}] attempted to bulk index while failed.", transformId); + nextPhase.onFailure(new ElasticsearchException("Attempted to do a bulk index request for failed transform [{}].", + transformId)); + return; + } ClientHelper.executeWithHeadersAsync(transformConfig.getHeaders(), ClientHelper.DATA_FRAME_ORIGIN, client, @@ -788,6 +843,12 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S @Override protected void doSaveState(IndexerState indexerState, DataFrameIndexerPosition position, Runnable next) { + if (transformTask.taskState.get() == DataFrameTransformTaskState.FAILED) { + logger.debug("[{}] attempted to save state and stats while failed.", transformId); + // If we are failed, we should call next to allow failure handling to occur if necessary. + next.run(); + return; + } if (indexerState.equals(IndexerState.ABORTING)) { // If we're aborting, just invoke `next` (which is likely an onFailure handler) next.run(); @@ -831,7 +892,6 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S r -> { // for auto stop shutdown the task if (state.getTaskState().equals(DataFrameTransformTaskState.STOPPED)) { - onStop(); transformTask.shutdown(); } next.run(); @@ -853,8 +913,6 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S protected void onFailure(Exception exc) { // the failure handler must not throw an exception due to internal problems try { - logger.warn("Data frame transform [" + transformTask.getTransformId() + "] encountered an exception: ", exc); - // Since our schedule fires again very quickly after failures it is possible to run into the same failure numerous // times in a row, very quickly. We do not want to spam the audit log with repeated failures, so only record the first one if (exc.getMessage().equals(lastAuditedExceptionMessage) == false) { @@ -989,6 +1047,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S } synchronized void handleFailure(Exception e) { + logger.warn("Data frame transform [" + transformTask.getTransformId() + "] encountered an exception: ", e); if (handleCircuitBreakingException(e)) { return; } @@ -1003,7 +1062,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S @Override protected void failIndexer(String failureMessage) { - logger.error("Data frame transform [" + getJobId() + "]:" + failureMessage); + logger.error("Data frame transform [" + getJobId() + "]: " + failureMessage); auditor.error(transformTask.getTransformId(), failureMessage); transformTask.markAsFailed(failureMessage, ActionListener.wrap( r -> { diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerStateTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerStateTests.java index 62642cdd10b..468d8ce7abf 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerStateTests.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerStateTests.java @@ -40,6 +40,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; @@ -174,15 +175,24 @@ public class RollupIndexerStateTests extends ESTestCase { final Function searchFunction; final Function bulkFunction; final Consumer failureConsumer; + final BiConsumer> saveStateCheck; private CountDownLatch latch; NonEmptyRollupIndexer(Executor executor, RollupJob job, AtomicReference initialState, Map initialPosition, Function searchFunction, Function bulkFunction, Consumer failureConsumer) { + this(executor, job, initialState, initialPosition, searchFunction, bulkFunction, failureConsumer, (i, m) -> {}); + } + + NonEmptyRollupIndexer(Executor executor, RollupJob job, AtomicReference initialState, + Map initialPosition, Function searchFunction, + Function bulkFunction, Consumer failureConsumer, + BiConsumer> saveStateCheck) { super(executor, job, initialState, initialPosition, new AtomicBoolean(randomBoolean())); this.searchFunction = searchFunction; this.bulkFunction = bulkFunction; this.failureConsumer = failureConsumer; + this.saveStateCheck = saveStateCheck; } private CountDownLatch newLatch(int count) { @@ -220,6 +230,7 @@ public class RollupIndexerStateTests extends ESTestCase { @Override protected void doSaveState(IndexerState state, Map position, Runnable next) { assert state == IndexerState.STARTED || state == IndexerState.INDEXING || state == IndexerState.STOPPED; + saveStateCheck.accept(state, position); next.run(); } @@ -770,6 +781,9 @@ public class RollupIndexerStateTests extends ESTestCase { Consumer failureConsumer = e -> { assertThat(e.getMessage(), equalTo("Could not identify key in agg [foo]")); + }; + + BiConsumer> doSaveStateCheck = (indexerState, position) -> { isFinished.set(true); }; @@ -777,7 +791,7 @@ public class RollupIndexerStateTests extends ESTestCase { try { NonEmptyRollupIndexer indexer = new NonEmptyRollupIndexer(executor, job, state, null, - searchFunction, bulkFunction, failureConsumer); + searchFunction, bulkFunction, failureConsumer, doSaveStateCheck); final CountDownLatch latch = indexer.newLatch(1); indexer.start(); assertThat(indexer.getState(), equalTo(IndexerState.STARTED));