[ML][Data frame] fixing failure state transitions and race condition (#45627) (#45656)

* [ML][Data frame] fixing failure state transitions and race condition (#45627)

There is a small window for a race condition while we are flagging a task as failed.

Here are the steps where the race condition occurs:
1. A failure occurs
2. Before `AsyncTwoPhaseIndexer` calls the `onFailure` handler it does the following:
   a. `finishAndSetState()` which sets the IndexerState to STARTED
   b. `doSaveState(...)` which attempts to save the current state of the indexer
3. Another trigger is fired BEFORE `onFailure` can fire, but AFTER `finishAndSetState()` occurs.

The trick here is that we will eventually set the indexer to failed, but possibly not before another trigger had the opportunity to fire. This could obviously cause some weird state interactions. To combat this, I have put in some predicates to verify the state before taking actions. This is so if state is indeed marked failed, the "second trigger" stops ASAP.

Additionally, I move the task state checks INTO the `start` and `stop` methods, which will now require a `force` parameter. `start`, `stop`, `trigger` and `markAsFailed` are all `synchronized`. This should gives us some guarantees that one will not switch states out from underneath another.

I also flag the task as `failed` BEFORE we successfully write it to cluster state, this is to allow us to make the task fail more quickly. But, this does add the behavior where the task is "failed" but the cluster state does not indicate as much. Adding the checks in `start` and `stop` will handle this "real state vs cluster state" race condition. This has always been a problem for `_stop` as it is not a master node action and doesn’t always have the latest cluster state.

closes #45609

Relates to #45562

* [ML][Data Frame] moves failure state transition for MT safety (#45676)

* [ML][Data Frame] moves failure state transition for MT safety

* removing unused imports
This commit is contained in:
Benjamin Trent 2019-08-20 07:30:17 -05:00 committed by GitHub
parent 041385559c
commit 88641a08af
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 230 additions and 98 deletions

View File

@ -34,6 +34,10 @@ public class DataFrameMessages {
public static final String DATA_FRAME_CANNOT_STOP_FAILED_TRANSFORM =
"Unable to stop data frame transform [{0}] as it is in a failed state with reason [{1}]." +
" Use force stop to stop the data frame transform.";
public static final String DATA_FRAME_CANNOT_START_FAILED_TRANSFORM =
"Unable to start data frame transform [{0}] as it is in a failed state with failure: [{1}]. " +
"Use force start to restart data frame transform once error is resolved.";
public static final String FAILED_TO_CREATE_DESTINATION_INDEX = "Could not create destination index [{0}] for transform [{1}]";
public static final String FAILED_TO_RELOAD_TRANSFORM_CONFIGURATION =
"Failed to reload data frame transform configuration for transform [{0}]";

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.core.dataframe.action;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.tasks.BaseTasksRequest;
@ -34,24 +35,40 @@ public class StartDataFrameTransformTaskAction extends ActionType<StartDataFrame
public static class Request extends BaseTasksRequest<Request> {
private final String id;
private final boolean force;
public Request(String id) {
public Request(String id, boolean force) {
this.id = ExceptionsHelper.requireNonNull(id, DataFrameField.ID.getPreferredName());
this.force = force;
}
public Request(StreamInput in) throws IOException {
super(in);
id = in.readString();
if (in.getVersion().onOrAfter(Version.V_7_4_0)) {
force = in.readBoolean();
} else {
// The behavior before V_7_4_0 was that this flag did not exist,
// assuming previous checks allowed this task to be started.
force = true;
}
}
public String getId() {
return id;
}
public boolean isForce() {
return force;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(id);
if (out.getVersion().onOrAfter(Version.V_7_4_0)) {
out.writeBoolean(force);
}
}
@Override
@ -66,7 +83,7 @@ public class StartDataFrameTransformTaskAction extends ActionType<StartDataFrame
@Override
public int hashCode() {
return Objects.hash(id);
return Objects.hash(id, force);
}
@Override
@ -78,7 +95,7 @@ public class StartDataFrameTransformTaskAction extends ActionType<StartDataFrame
return false;
}
Request other = (Request) obj;
return Objects.equals(id, other.id);
return Objects.equals(id, other.id) && force == other.force;
}
}

View File

@ -160,10 +160,8 @@ public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends Indexer
} else {
finishAndSetState();
}
}, e -> {
finishAndSetState();
onFailure(e);
}));
},
this::finishWithFailure));
});
logger.debug("Beginning to index [" + getJobId() + "], state: [" + currentState + "]");
return true;
@ -250,8 +248,9 @@ public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends Indexer
/**
* Called when a failure occurs in an async job causing the execution to stop.
*
* @param exc
* The exception
* This is called before the internal state changes from the state in which the failure occurred.
*
* @param exc The exception
*/
protected abstract void onFailure(Exception exc);
@ -279,12 +278,19 @@ public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends Indexer
private void finishWithSearchFailure(Exception exc) {
stats.incrementSearchFailures();
doSaveState(finishAndSetState(), position.get(), () -> onFailure(exc));
onFailure(exc);
doSaveState(finishAndSetState(), position.get(), () -> {});
}
private void finishWithIndexingFailure(Exception exc) {
stats.incrementIndexingFailures();
doSaveState(finishAndSetState(), position.get(), () -> onFailure(exc));
onFailure(exc);
doSaveState(finishAndSetState(), position.get(), () -> {});
}
private void finishWithFailure(Exception exc) {
onFailure(exc);
finishAndSetState();
}
private IndexerState finishAndSetState() {
@ -390,8 +396,7 @@ public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends Indexer
ActionListener<SearchResponse> listener = ActionListener.wrap(this::onSearchResponse, this::finishWithSearchFailure);
nextSearch(listener);
} catch (Exception e) {
finishAndSetState();
onFailure(e);
finishWithFailure(e);
}
}
} catch (Exception e) {

View File

@ -51,7 +51,7 @@ public class DataFrameMessagesTests extends ESTestCase {
try {
innerAssertSingleMessage(message);
} catch (Exception e) {
fail(e.getMessage());
fail("message: " + message + " failure: " + e.getMessage());
}
}

View File

@ -13,7 +13,7 @@ public class StartDataFrameTransformTaskActionRequestTests extends
AbstractWireSerializingTestCase<StartDataFrameTransformTaskAction.Request> {
@Override
protected StartDataFrameTransformTaskAction.Request createTestInstance() {
return new StartDataFrameTransformTaskAction.Request(randomAlphaOfLength(4));
return new StartDataFrameTransformTaskAction.Request(randomAlphaOfLength(4), randomBoolean());
}
@Override

View File

@ -37,9 +37,14 @@ public class DataFrameTaskFailedStateIT extends DataFrameRestTestCase {
@Before
public void setClusterSettings() throws IOException {
// Make sure we never retry on failure to speed up the test
// Set logging level to trace
// see: https://github.com/elastic/elasticsearch/issues/45562
Request addFailureRetrySetting = new Request("PUT", "/_cluster/settings");
addFailureRetrySetting.setJsonEntity(
"{\"persistent\": {\"xpack.data_frame.num_transform_failure_retries\": \"" + 0 + "\"}}");
"{\"transient\": {\"xpack.data_frame.num_transform_failure_retries\": \"" + 0 + "\"," +
"\"logger.org.elasticsearch.action.bulk\": \"info\"," + // reduces bulk failure spam
"\"logger.org.elasticsearch.xpack.core.indexing.AsyncTwoPhaseIndexer\": \"trace\"," +
"\"logger.org.elasticsearch.xpack.dataframe\": \"trace\"}}");
client().performRequest(addFailureRetrySetting);
}
@ -84,7 +89,6 @@ public class DataFrameTaskFailedStateIT extends DataFrameRestTestCase {
assertThat(XContentMapValues.extractValue("reason", fullState), is(nullValue()));
}
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/45609")
public void testForceStartFailedTransform() throws Exception {
String transformId = "test-force-start-failed-transform";
createReviewsIndex(REVIEWS_INDEX_NAME, 10);
@ -100,13 +104,16 @@ public class DataFrameTaskFailedStateIT extends DataFrameRestTestCase {
// Verify we have failed for the expected reason
assertThat(XContentMapValues.extractValue("reason", fullState), equalTo(failureReason));
final String expectedFailure = "Unable to start data frame transform [test-force-start-failed-transform] " +
"as it is in a failed state with failure: [" + failureReason +
"]. Use force start to restart data frame transform once error is resolved.";
// Verify that we cannot start the transform when the task is in a failed state
assertBusy(() -> {
ResponseException ex = expectThrows(ResponseException.class, () -> startDataframeTransform(transformId, false));
assertThat(ex.getResponse().getStatusLine().getStatusCode(), equalTo(RestStatus.CONFLICT.getStatus()));
assertThat(XContentMapValues.extractValue("error.reason", entityAsMap(ex.getResponse())),
equalTo("Unable to start data frame transform [test-force-start-failed-transform] as it is in a failed state with failure: [" +
failureReason +
"]. Use force start to restart data frame transform once error is resolved."));
equalTo(expectedFailure));
}, 60, TimeUnit.SECONDS);
// Correct the failure by deleting the destination index
deleteIndex(dataFrameIndex);

View File

@ -160,7 +160,7 @@ public class TransportStartDataFrameTransformAction extends
ClientHelper.executeAsyncWithOrigin(client,
ClientHelper.DATA_FRAME_ORIGIN,
StartDataFrameTransformTaskAction.INSTANCE,
new StartDataFrameTransformTaskAction.Request(request.getId()),
new StartDataFrameTransformTaskAction.Request(request.getId(), request.isForce()),
ActionListener.wrap(
r -> listener.onResponse(new StartDataFrameTransformAction.Response(true)),
listener::onFailure));

View File

@ -59,7 +59,8 @@ public class TransportStartDataFrameTransformTaskAction extends
protected void taskOperation(StartDataFrameTransformTaskAction.Request request, DataFrameTransformTask transformTask,
ActionListener<StartDataFrameTransformTaskAction.Response> listener) {
if (transformTask.getTransformId().equals(request.getId())) {
transformTask.start(null, listener);
//TODO fix bug as .start where it was failed could result in a null current checkpoint?
transformTask.start(null, request.isForce(), listener);
} else {
listener.onFailure(new RuntimeException("ID of data frame transform task [" + transformTask.getTransformId()
+ "] does not match request's ID [" + request.getId() + "]"));

View File

@ -7,6 +7,7 @@ package org.elasticsearch.xpack.dataframe.action;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
@ -19,6 +20,7 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.LoggerMessageFormat;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.MasterNotDiscoveredException;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
@ -30,6 +32,8 @@ import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.action.util.PageParams;
import org.elasticsearch.xpack.core.dataframe.DataFrameMessages;
import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction;
import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction.Request;
import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction.Response;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex;
@ -37,16 +41,15 @@ import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigMa
import org.elasticsearch.xpack.dataframe.transforms.DataFrameTransformTask;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import static org.elasticsearch.xpack.core.dataframe.DataFrameMessages.DATA_FRAME_CANNOT_STOP_FAILED_TRANSFORM;
public class TransportStopDataFrameTransformAction extends
TransportTasksAction<DataFrameTransformTask, StopDataFrameTransformAction.Request,
StopDataFrameTransformAction.Response, StopDataFrameTransformAction.Response> {
public class TransportStopDataFrameTransformAction extends TransportTasksAction<DataFrameTransformTask, Request, Response, Response> {
private static final Logger logger = LogManager.getLogger(TransportStopDataFrameTransformAction.class);
@ -61,8 +64,8 @@ public class TransportStopDataFrameTransformAction extends
PersistentTasksService persistentTasksService,
DataFrameTransformsConfigManager dataFrameTransformsConfigManager,
Client client) {
super(StopDataFrameTransformAction.NAME, clusterService, transportService, actionFilters, StopDataFrameTransformAction.Request::new,
StopDataFrameTransformAction.Response::new, StopDataFrameTransformAction.Response::new, ThreadPool.Names.SAME);
super(StopDataFrameTransformAction.NAME, clusterService, transportService, actionFilters, Request::new,
Response::new, Response::new, ThreadPool.Names.SAME);
this.threadPool = threadPool;
this.dataFrameTransformsConfigManager = dataFrameTransformsConfigManager;
this.persistentTasksService = persistentTasksService;
@ -96,8 +99,7 @@ public class TransportStopDataFrameTransformAction extends
}
@Override
protected void doExecute(Task task, StopDataFrameTransformAction.Request request,
ActionListener<StopDataFrameTransformAction.Response> listener) {
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
final ClusterState state = clusterService.state();
final DiscoveryNodes nodes = state.nodes();
if (nodes.isLocalNodeElectedMaster() == false) {
@ -106,10 +108,10 @@ public class TransportStopDataFrameTransformAction extends
listener.onFailure(new MasterNotDiscoveredException("no known master node"));
} else {
transportService.sendRequest(nodes.getMasterNode(), actionName, request,
new ActionListenerResponseHandler<>(listener, StopDataFrameTransformAction.Response::new));
new ActionListenerResponseHandler<>(listener, Response::new));
}
} else {
final ActionListener<StopDataFrameTransformAction.Response> finalListener;
final ActionListener<Response> finalListener;
if (request.waitForCompletion()) {
finalListener = waitForStopListener(request, listener);
} else {
@ -131,8 +133,7 @@ public class TransportStopDataFrameTransformAction extends
}
@Override
protected void taskOperation(StopDataFrameTransformAction.Request request, DataFrameTransformTask transformTask,
ActionListener<StopDataFrameTransformAction.Response> listener) {
protected void taskOperation(Request request, DataFrameTransformTask transformTask, ActionListener<Response> listener) {
Set<String> ids = request.getExpandedIds();
if (ids == null) {
@ -141,20 +142,13 @@ public class TransportStopDataFrameTransformAction extends
}
if (ids.contains(transformTask.getTransformId())) {
// This should not occur as we check that none of the tasks are in a failed state earlier
// Keep this check in here for insurance.
if (transformTask.getState().getTaskState() == DataFrameTransformTaskState.FAILED && request.isForce() == false) {
listener.onFailure(
new ElasticsearchStatusException(
DataFrameMessages.getMessage(DATA_FRAME_CANNOT_STOP_FAILED_TRANSFORM,
request.getId(),
transformTask.getState().getReason()),
RestStatus.CONFLICT));
try {
transformTask.stop(request.isForce());
} catch (ElasticsearchException ex) {
listener.onFailure(ex);
return;
}
transformTask.stop();
listener.onResponse(new StopDataFrameTransformAction.Response(Boolean.TRUE));
listener.onResponse(new Response(Boolean.TRUE));
} else {
listener.onFailure(new RuntimeException("ID of data frame indexer task [" + transformTask.getTransformId()
+ "] does not match request's ID [" + request.getId() + "]"));
@ -162,24 +156,22 @@ public class TransportStopDataFrameTransformAction extends
}
@Override
protected StopDataFrameTransformAction.Response newResponse(StopDataFrameTransformAction.Request request,
List<StopDataFrameTransformAction.Response> tasks, List<TaskOperationFailure> taskOperationFailures,
protected StopDataFrameTransformAction.Response newResponse(Request request,
List<Response> tasks,
List<TaskOperationFailure> taskOperationFailures,
List<FailedNodeException> failedNodeExceptions) {
if (taskOperationFailures.isEmpty() == false || failedNodeExceptions.isEmpty() == false) {
return new StopDataFrameTransformAction.Response(taskOperationFailures, failedNodeExceptions, false);
return new Response(taskOperationFailures, failedNodeExceptions, false);
}
// if tasks is empty allMatch is 'vacuously satisfied'
boolean allAcknowledged = tasks.stream().allMatch(StopDataFrameTransformAction.Response::isAcknowledged);
return new StopDataFrameTransformAction.Response(allAcknowledged);
return new Response(tasks.stream().allMatch(Response::isAcknowledged));
}
private ActionListener<StopDataFrameTransformAction.Response>
waitForStopListener(StopDataFrameTransformAction.Request request,
ActionListener<StopDataFrameTransformAction.Response> listener) {
private ActionListener<Response> waitForStopListener(Request request, ActionListener<Response> listener) {
ActionListener<StopDataFrameTransformAction.Response> onStopListener = ActionListener.wrap(
ActionListener<Response> onStopListener = ActionListener.wrap(
waitResponse ->
client.admin()
.indices()
@ -198,37 +190,70 @@ public class TransportStopDataFrameTransformAction extends
// Wait until the persistent task is stopped
// Switch over to Generic threadpool so we don't block the network thread
threadPool.generic().execute(() ->
waitForDataFrameStopped(request.getExpandedIds(), request.getTimeout(), onStopListener));
waitForDataFrameStopped(request.getExpandedIds(), request.getTimeout(), request.isForce(), onStopListener));
},
listener::onFailure
);
}
private void waitForDataFrameStopped(Collection<String> persistentTaskIds, TimeValue timeout,
ActionListener<StopDataFrameTransformAction.Response> listener) {
private void waitForDataFrameStopped(Set<String> persistentTaskIds,
TimeValue timeout,
boolean force,
ActionListener<Response> listener) {
// This map is accessed in the predicate and the listener callbacks
final Map<String, ElasticsearchException> exceptions = new ConcurrentHashMap<>();
persistentTasksService.waitForPersistentTasksCondition(persistentTasksCustomMetaData -> {
if (persistentTasksCustomMetaData == null) {
return true;
}
for (String persistentTaskId : persistentTaskIds) {
if (persistentTasksCustomMetaData.getTask(persistentTaskId) != null) {
PersistentTasksCustomMetaData.PersistentTask<?> transformsTask = persistentTasksCustomMetaData.getTask(persistentTaskId);
// Either the task has successfully stopped or we have seen that it has failed
if (transformsTask == null || exceptions.containsKey(persistentTaskId)) {
continue;
}
// If force is true, then it should eventually go away, don't add it to the collection of failures.
DataFrameTransformState taskState = (DataFrameTransformState)transformsTask.getState();
if (force == false && taskState != null && taskState.getTaskState() == DataFrameTransformTaskState.FAILED) {
exceptions.put(persistentTaskId, new ElasticsearchStatusException(
DataFrameMessages.getMessage(DATA_FRAME_CANNOT_STOP_FAILED_TRANSFORM,
persistentTaskId,
taskState.getReason()),
RestStatus.CONFLICT));
// If all the tasks are now flagged as failed, do not wait for another ClusterState update.
// Return to the caller as soon as possible
return persistentTasksCustomMetaData.tasks().stream().allMatch(p -> exceptions.containsKey(p.getId()));
}
return false;
}
}
return true;
}, timeout, new ActionListener<Boolean>() {
@Override
public void onResponse(Boolean result) {
listener.onResponse(new StopDataFrameTransformAction.Response(Boolean.TRUE));
}, timeout, ActionListener.wrap(
r -> {
// No exceptions AND the tasks have gone away
if (exceptions.isEmpty()) {
listener.onResponse(new Response(Boolean.TRUE));
return;
}
// We are only stopping one task, so if there is a failure, it is the only one
if (persistentTaskIds.size() == 1) {
listener.onFailure(exceptions.get(persistentTaskIds.iterator().next()));
return;
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
Set<String> stoppedTasks = new HashSet<>(persistentTaskIds);
stoppedTasks.removeAll(exceptions.keySet());
String message = stoppedTasks.isEmpty() ?
"Could not stop any of the tasks as all were failed. Use force stop to stop the transforms." :
LoggerMessageFormat.format("Successfully stopped [{}] transforms. " +
"Could not stop the transforms {} as they were failed. Use force stop to stop the transforms.",
stoppedTasks.size(),
exceptions.keySet());
listener.onFailure(new ElasticsearchStatusException(message, RestStatus.CONFLICT));
},
listener::onFailure
));
}
}

View File

@ -299,7 +299,7 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx
Long previousCheckpoint,
ActionListener<StartDataFrameTransformTaskAction.Response> listener) {
buildTask.initializeIndexer(indexerBuilder);
buildTask.setNumFailureRetries(numFailureRetries).start(previousCheckpoint, listener);
buildTask.setNumFailureRetries(numFailureRetries).start(previousCheckpoint, false, listener);
}
private void setNumFailureRetries(int numFailureRetries) {

View File

@ -10,6 +10,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkAction;
@ -26,6 +27,7 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.persistent.AllocatedPersistentTask;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ClientHelper;
@ -60,6 +62,9 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.xpack.core.dataframe.DataFrameMessages.DATA_FRAME_CANNOT_START_FAILED_TRANSFORM;
import static org.elasticsearch.xpack.core.dataframe.DataFrameMessages.DATA_FRAME_CANNOT_STOP_FAILED_TRANSFORM;
public class DataFrameTransformTask extends AllocatedPersistentTask implements SchedulerEngine.Listener {
@ -240,7 +245,16 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
* current checkpoint is not set
* @param listener Started listener
*/
public synchronized void start(Long startingCheckpoint, ActionListener<Response> listener) {
public synchronized void start(Long startingCheckpoint, boolean force, ActionListener<Response> listener) {
logger.debug("[{}] start called with force [{}] and state [{}]", getTransformId(), force, getState());
if (taskState.get() == DataFrameTransformTaskState.FAILED && force == false) {
listener.onFailure(new ElasticsearchStatusException(
DataFrameMessages.getMessage(DATA_FRAME_CANNOT_START_FAILED_TRANSFORM,
getTransformId(),
stateReason.get()),
RestStatus.CONFLICT));
return;
}
if (getIndexer() == null) {
listener.onFailure(new ElasticsearchException("Task for transform [{}] not fully initialized. Try again later",
getTransformId()));
@ -289,7 +303,8 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
));
}
public synchronized void stop() {
public synchronized void stop(boolean force) {
logger.debug("[{}] stop called with force [{}] and state [{}]", getTransformId(), force, getState());
if (getIndexer() == null) {
// If there is no indexer the task has not been triggered
// but it still needs to be stopped and removed
@ -301,8 +316,20 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
return;
}
if (taskState.get() == DataFrameTransformTaskState.FAILED && force == false) {
throw new ElasticsearchStatusException(
DataFrameMessages.getMessage(DATA_FRAME_CANNOT_STOP_FAILED_TRANSFORM,
getTransformId(),
stateReason.get()),
RestStatus.CONFLICT);
}
IndexerState state = getIndexer().stop();
stateReason.set(null);
// We just don't want it to be failed if it is failed
// Either we are running, and the STATE is already started or failed
// doSaveState should transfer the state to STOPPED when it needs to.
taskState.set(DataFrameTransformTaskState.STARTED);
if (state == IndexerState.STOPPED) {
getIndexer().onStop();
getIndexer().doSaveState(state, getIndexer().getPosition(), () -> {});
@ -311,13 +338,13 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
@Override
public synchronized void triggered(Event event) {
if (getIndexer() == null) {
logger.warn("Data frame task [{}] triggered with an unintialized indexer", getTransformId());
// Ignore if event is not for this job
if (event.getJobName().equals(schedulerJobName()) == false) {
return;
}
// Ignore if event is not for this job
if (event.getJobName().equals(schedulerJobName()) == false) {
if (getIndexer() == null) {
logger.warn("Data frame task [{}] triggered with an unintialized indexer", getTransformId());
return;
}
@ -388,6 +415,21 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
if (getIndexer() != null && getIndexer().getState() == IndexerState.STOPPING) {
logger.info("Attempt to fail transform [" + getTransformId() + "] with reason [" + reason + "] while it was stopping.");
auditor.info(getTransformId(), "Attempted to fail transform with reason [" + reason + "] while in STOPPING state.");
listener.onResponse(null);
return;
}
// If we are stopped, this means that between the failure occurring and being handled, somebody called stop
// We should just allow that stop to continue
if (getIndexer() != null && getIndexer().getState() == IndexerState.STOPPED) {
logger.info("[{}] encountered a failure but indexer is STOPPED; reason [{}]", getTransformId(), reason);
listener.onResponse(null);
return;
}
// If we are already flagged as failed, this probably means that a second trigger started firing while we were attempting to
// flag the previously triggered indexer as failed. Exit early as we are already flagged as failed.
if (taskState.get() == DataFrameTransformTaskState.FAILED) {
logger.warn("[{}] is already failed but encountered new failure; reason [{}] ", getTransformId(), reason);
listener.onResponse(null);
return;
}
auditor.error(transform.getId(), reason);
@ -396,25 +438,21 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
deregisterSchedulerJob();
DataFrameTransformState newState = new DataFrameTransformState(
DataFrameTransformTaskState.FAILED,
initialIndexerState,
initialPosition,
getIndexer() == null ? initialIndexerState : getIndexer().getState(),
getIndexer() == null ? initialPosition : getIndexer().getPosition(),
currentCheckpoint.get(),
reason,
getIndexer() == null ? null : getIndexer().getProgress());
taskState.set(DataFrameTransformTaskState.FAILED);
stateReason.set(reason);
// Even though the indexer information is persisted to an index, we still need DataFrameTransformTaskState in the clusterstate
// This keeps track of STARTED, FAILED, STOPPED
// This is because a FAILED state can occur because we cannot read the config from the internal index, which would imply that
// This is because a FAILED state could occur because we failed to read the config from the internal index, which would imply that
// we could not read the previous state information from said index.
persistStateToClusterState(newState, ActionListener.wrap(
r -> {
taskState.set(DataFrameTransformTaskState.FAILED);
stateReason.set(reason);
listener.onResponse(null);
},
r -> listener.onResponse(null),
e -> {
logger.error("Failed to set task state as failed to cluster state", e);
taskState.set(DataFrameTransformTaskState.FAILED);
stateReason.set(reason);
listener.onFailure(e);
}
));
@ -630,6 +668,11 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
@Override
protected void onStart(long now, ActionListener<Boolean> listener) {
if (transformTask.taskState.get() == DataFrameTransformTaskState.FAILED) {
logger.debug("[{}] attempted to start while failed.", transformId);
listener.onFailure(new ElasticsearchException("Attempted to start a failed transform [{}].", transformId));
return;
}
// On each run, we need to get the total number of docs and reset the count of processed docs
// Since multiple checkpoints can be executed in the task while it is running on the same node, we need to gather
// the progress here, and not in the executor.
@ -746,12 +789,24 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
@Override
protected void doNextSearch(SearchRequest request, ActionListener<SearchResponse> nextPhase) {
if (transformTask.taskState.get() == DataFrameTransformTaskState.FAILED) {
logger.debug("[{}] attempted to search while failed.", transformId);
nextPhase.onFailure(new ElasticsearchException("Attempted to do a search request for failed transform [{}].",
transformId));
return;
}
ClientHelper.executeWithHeadersAsync(transformConfig.getHeaders(), ClientHelper.DATA_FRAME_ORIGIN, client,
SearchAction.INSTANCE, request, nextPhase);
}
@Override
protected void doNextBulk(BulkRequest request, ActionListener<BulkResponse> nextPhase) {
if (transformTask.taskState.get() == DataFrameTransformTaskState.FAILED) {
logger.debug("[{}] attempted to bulk index while failed.", transformId);
nextPhase.onFailure(new ElasticsearchException("Attempted to do a bulk index request for failed transform [{}].",
transformId));
return;
}
ClientHelper.executeWithHeadersAsync(transformConfig.getHeaders(),
ClientHelper.DATA_FRAME_ORIGIN,
client,
@ -788,6 +843,12 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
@Override
protected void doSaveState(IndexerState indexerState, DataFrameIndexerPosition position, Runnable next) {
if (transformTask.taskState.get() == DataFrameTransformTaskState.FAILED) {
logger.debug("[{}] attempted to save state and stats while failed.", transformId);
// If we are failed, we should call next to allow failure handling to occur if necessary.
next.run();
return;
}
if (indexerState.equals(IndexerState.ABORTING)) {
// If we're aborting, just invoke `next` (which is likely an onFailure handler)
next.run();
@ -831,7 +892,6 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
r -> {
// for auto stop shutdown the task
if (state.getTaskState().equals(DataFrameTransformTaskState.STOPPED)) {
onStop();
transformTask.shutdown();
}
next.run();
@ -853,8 +913,6 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
protected void onFailure(Exception exc) {
// the failure handler must not throw an exception due to internal problems
try {
logger.warn("Data frame transform [" + transformTask.getTransformId() + "] encountered an exception: ", exc);
// Since our schedule fires again very quickly after failures it is possible to run into the same failure numerous
// times in a row, very quickly. We do not want to spam the audit log with repeated failures, so only record the first one
if (exc.getMessage().equals(lastAuditedExceptionMessage) == false) {
@ -989,6 +1047,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
}
synchronized void handleFailure(Exception e) {
logger.warn("Data frame transform [" + transformTask.getTransformId() + "] encountered an exception: ", e);
if (handleCircuitBreakingException(e)) {
return;
}

View File

@ -40,6 +40,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
@ -174,15 +175,24 @@ public class RollupIndexerStateTests extends ESTestCase {
final Function<SearchRequest, SearchResponse> searchFunction;
final Function<BulkRequest, BulkResponse> bulkFunction;
final Consumer<Exception> failureConsumer;
final BiConsumer<IndexerState, Map<String, Object>> saveStateCheck;
private CountDownLatch latch;
NonEmptyRollupIndexer(Executor executor, RollupJob job, AtomicReference<IndexerState> initialState,
Map<String, Object> initialPosition, Function<SearchRequest, SearchResponse> searchFunction,
Function<BulkRequest, BulkResponse> bulkFunction, Consumer<Exception> failureConsumer) {
this(executor, job, initialState, initialPosition, searchFunction, bulkFunction, failureConsumer, (i, m) -> {});
}
NonEmptyRollupIndexer(Executor executor, RollupJob job, AtomicReference<IndexerState> initialState,
Map<String, Object> initialPosition, Function<SearchRequest, SearchResponse> searchFunction,
Function<BulkRequest, BulkResponse> bulkFunction, Consumer<Exception> failureConsumer,
BiConsumer<IndexerState, Map<String, Object>> saveStateCheck) {
super(executor, job, initialState, initialPosition, new AtomicBoolean(randomBoolean()));
this.searchFunction = searchFunction;
this.bulkFunction = bulkFunction;
this.failureConsumer = failureConsumer;
this.saveStateCheck = saveStateCheck;
}
private CountDownLatch newLatch(int count) {
@ -220,6 +230,7 @@ public class RollupIndexerStateTests extends ESTestCase {
@Override
protected void doSaveState(IndexerState state, Map<String, Object> position, Runnable next) {
assert state == IndexerState.STARTED || state == IndexerState.INDEXING || state == IndexerState.STOPPED;
saveStateCheck.accept(state, position);
next.run();
}
@ -770,6 +781,9 @@ public class RollupIndexerStateTests extends ESTestCase {
Consumer<Exception> failureConsumer = e -> {
assertThat(e.getMessage(), equalTo("Could not identify key in agg [foo]"));
};
BiConsumer<IndexerState, Map<String, Object>> doSaveStateCheck = (indexerState, position) -> {
isFinished.set(true);
};
@ -777,7 +791,7 @@ public class RollupIndexerStateTests extends ESTestCase {
try {
NonEmptyRollupIndexer indexer = new NonEmptyRollupIndexer(executor, job, state, null,
searchFunction, bulkFunction, failureConsumer);
searchFunction, bulkFunction, failureConsumer, doSaveStateCheck);
final CountDownLatch latch = indexer.newLatch(1);
indexer.start();
assertThat(indexer.getState(), equalTo(IndexerState.STARTED));