[ML] Complete the Data Frame task on stop (#41752) (#42063)

Wait for indexer to stop then complete the persistent task on stop.
If the wait_for_completion is true the request will not return until stopped.
This commit is contained in:
David Kyle 2019-05-21 10:24:20 +01:00 committed by GitHub
parent 7b3a9c7033
commit 24144aead2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 249 additions and 277 deletions

View File

@ -141,7 +141,8 @@ public class DataFrameTransformIT extends ESRestHighLevelClientTestCase {
@After
public void cleanUpTransforms() throws IOException {
for (String transformId : transformsToClean) {
highLevelClient().dataFrame().stopDataFrameTransform(new StopDataFrameTransformRequest(transformId), RequestOptions.DEFAULT);
highLevelClient().dataFrame().stopDataFrameTransform(
new StopDataFrameTransformRequest(transformId, Boolean.TRUE, null), RequestOptions.DEFAULT);
}
for (String transformId : transformsToClean) {
@ -265,7 +266,7 @@ public class DataFrameTransformIT extends ESRestHighLevelClientTestCase {
assertThat(statsResponse.getTransformsStateAndStats(), hasSize(1));
assertEquals(IndexerState.STARTED, statsResponse.getTransformsStateAndStats().get(0).getTransformState().getIndexerState());
StopDataFrameTransformRequest stopRequest = new StopDataFrameTransformRequest(id);
StopDataFrameTransformRequest stopRequest = new StopDataFrameTransformRequest(id, Boolean.TRUE, null);
StopDataFrameTransformResponse stopResponse =
execute(stopRequest, client::stopDataFrameTransform, client::stopDataFrameTransformAsync);
assertTrue(stopResponse.isStopped());

View File

@ -76,7 +76,8 @@ public class DataFrameTransformDocumentationIT extends ESRestHighLevelClientTest
@After
public void cleanUpTransforms() throws IOException {
for (String transformId : transformsToClean) {
highLevelClient().dataFrame().stopDataFrameTransform(new StopDataFrameTransformRequest(transformId), RequestOptions.DEFAULT);
highLevelClient().dataFrame().stopDataFrameTransform(
new StopDataFrameTransformRequest(transformId, Boolean.TRUE, TimeValue.timeValueSeconds(20)), RequestOptions.DEFAULT);
}
for (String transformId : transformsToClean) {

View File

@ -7,25 +7,18 @@ package org.elasticsearch.xpack.core.dataframe.action;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.support.tasks.BaseTasksRequest;
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.dataframe.utils.ExceptionsHelper;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
public class DeleteDataFrameTransformAction extends Action<DeleteDataFrameTransformAction.Response> {
public class DeleteDataFrameTransformAction extends Action<AcknowledgedResponse> {
public static final DeleteDataFrameTransformAction INSTANCE = new DeleteDataFrameTransformAction();
public static final String NAME = "cluster:admin/data_frame/delete";
@ -35,17 +28,21 @@ public class DeleteDataFrameTransformAction extends Action<DeleteDataFrameTransf
}
@Override
public Response newResponse() {
public AcknowledgedResponse newResponse() {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
public Writeable.Reader<Response> getResponseReader() {
return Response::new;
public Writeable.Reader<AcknowledgedResponse> getResponseReader() {
return in -> {
AcknowledgedResponse response = new AcknowledgedResponse();
response.readFrom(in);
return response;
};
}
public static class Request extends BaseTasksRequest<Request> {
private final String id;
public static class Request extends MasterNodeRequest<Request> {
private String id;
public Request(String id) {
this.id = ExceptionsHelper.requireNonNull(id, DataFrameField.ID.getPreferredName());
@ -60,11 +57,6 @@ public class DeleteDataFrameTransformAction extends Action<DeleteDataFrameTransf
return id;
}
@Override
public boolean match(Task task) {
return task.getDescription().equals(DataFrameField.PERSISTENT_TASK_DESCRIPTION_PREFIX + id);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
@ -94,59 +86,4 @@ public class DeleteDataFrameTransformAction extends Action<DeleteDataFrameTransf
return Objects.equals(id, other.id);
}
}
public static class Response extends BaseTasksResponse implements Writeable, ToXContentObject {
private final boolean acknowledged;
public Response(StreamInput in) throws IOException {
super(in);
acknowledged = in.readBoolean();
}
public Response(boolean acknowledged, List<TaskOperationFailure> taskFailures, List<FailedNodeException> nodeFailures) {
super(taskFailures, nodeFailures);
this.acknowledged = acknowledged;
}
public Response(boolean acknowledged) {
this(acknowledged, Collections.emptyList(), Collections.emptyList());
}
public boolean isDeleted() {
return acknowledged;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(acknowledged);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
{
toXContentCommon(builder, params);
builder.field("acknowledged", acknowledged);
}
builder.endObject();
return builder;
}
@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
DeleteDataFrameTransformAction.Response response = (DeleteDataFrameTransformAction.Response) o;
return super.equals(o) && acknowledged == response.acknowledged;
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), acknowledged);
}
}
}

View File

@ -22,9 +22,11 @@ import java.util.concurrent.atomic.AtomicReference;
* An abstract class that builds an index incrementally. A background job can be launched using {@link #maybeTriggerAsyncJob(long)},
* it will create the index from the source index up to the last complete bucket that is allowed to be built (based on job position).
* Only one background job can run simultaneously and {@link #onFinish} is called when the job
* finishes. {@link #onFailure(Exception)} is called if the job fails with an exception and {@link #onAbort()} is called if the indexer is
* aborted while a job is running. The indexer must be started ({@link #start()} to allow a background job to run when
* {@link #maybeTriggerAsyncJob(long)} is called. {@link #stop()} can be used to stop the background job without aborting the indexer.
* finishes. {@link #onStop()} is called after the current search returns when the job is stopped early via a call
* to {@link #stop()}. {@link #onFailure(Exception)} is called if the job fails with an exception and {@link #onAbort()}
* is called if the indexer is aborted while a job is running. The indexer must be started ({@link #start()}
* to allow a background job to run when {@link #maybeTriggerAsyncJob(long)} is called.
* {@link #stop()} can be used to stop the background job without aborting the indexer.
*
* In a nutshell this is a 2 cycle engine: 1st it sends a query, 2nd it indexes documents based on the response, sends the next query,
* indexes, queries, indexes, ... until a condition lets the engine pause until the source provides new input.
@ -84,8 +86,10 @@ public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends Indexer
/**
* Sets the internal state to {@link IndexerState#STOPPING} if an async job is
* running in the background. If there is no job running when this function is
* called, the state is directly set to {@link IndexerState#STOPPED}.
* running in the background, {@link #onStop()} will be called when the background job
* detects that the indexer is stopped.
* If there is no job running when this function is called
* the state is set to {@link IndexerState#STOPPED} and {@link #onStop()} called directly.
*
* @return The new state for the indexer (STOPPED, STOPPING or ABORTING if the job was already aborted).
*/
@ -94,6 +98,7 @@ public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends Indexer
if (previousState == IndexerState.INDEXING) {
return IndexerState.STOPPING;
} else if (previousState == IndexerState.STARTED) {
onStop();
return IndexerState.STOPPED;
} else {
return previousState;
@ -251,6 +256,14 @@ public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends Indexer
*/
protected abstract void onFinish(ActionListener<Void> listener);
/**
* Called when the indexer is stopped. This is only called when the indexer is stopped
* via {@link #stop()} as opposed to {@link #onFinish(ActionListener)} which is called
* when the indexer's work is done.
*/
protected void onStop() {
}
/**
* Called when a background job detects that the indexer is aborted causing the
* async execution to stop.
@ -276,6 +289,7 @@ public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends Indexer
case STOPPING:
// must be started again
onStop();
return IndexerState.STOPPED;
case ABORTING:

View File

@ -1,22 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.dataframe.action;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.xpack.core.dataframe.action.DeleteDataFrameTransformAction.Response;
public class DeleteDataFrameTransformActionResponseTests extends AbstractWireSerializingDataFrameTestCase<Response> {
@Override
protected Response createTestInstance() {
return new Response(randomBoolean());
}
@Override
protected Reader<Response> instanceReader() {
return Response::new;
}
}

View File

@ -18,6 +18,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;
import java.io.IOException;
import java.util.Collections;
@ -34,17 +35,26 @@ import static org.hamcrest.Matchers.equalTo;
public class AsyncTwoPhaseIndexerTests extends ESTestCase {
AtomicBoolean isFinished = new AtomicBoolean(false);
AtomicBoolean isStopped = new AtomicBoolean(false);
@Before
public void reset() {
isFinished.set(false);
isStopped.set(false);
}
private class MockIndexer extends AsyncTwoPhaseIndexer<Integer, MockJobStats> {
private final CountDownLatch latch;
// test the execution order
private volatile int step;
private final boolean stoppedBeforeFinished;
protected MockIndexer(Executor executor, AtomicReference<IndexerState> initialState, Integer initialPosition,
CountDownLatch latch) {
CountDownLatch latch, boolean stoppedBeforeFinished) {
super(executor, initialState, initialPosition, new MockJobStats());
this.latch = latch;
this.stoppedBeforeFinished = stoppedBeforeFinished;
}
@Override
@ -57,7 +67,7 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase {
awaitForLatch();
assertThat(step, equalTo(3));
++step;
return new IterationResult<Integer>(Collections.emptyList(), 3, true);
return new IterationResult<>(Collections.emptyList(), 3, true);
}
private void awaitForLatch() {
@ -99,7 +109,8 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase {
@Override
protected void doSaveState(IndexerState state, Integer position, Runnable next) {
assertThat(step, equalTo(5));
int expectedStep = stoppedBeforeFinished ? 3 : 5;
assertThat(step, equalTo(expectedStep));
++step;
next.run();
}
@ -114,7 +125,12 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase {
assertThat(step, equalTo(4));
++step;
listener.onResponse(null);
isFinished.set(true);
assertTrue(isFinished.compareAndSet(false, true));
}
@Override
protected void onStop() {
assertTrue(isStopped.compareAndSet(false, true));
}
@Override
@ -180,7 +196,7 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase {
protected void onFailure(Exception exc) {
assertThat(step, equalTo(2));
++step;
isFinished.set(true);
assertTrue(isFinished.compareAndSet(false, true));
}
@Override
@ -209,10 +225,9 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase {
public void testStateMachine() throws Exception {
AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
final ExecutorService executor = Executors.newFixedThreadPool(1);
isFinished.set(false);
try {
CountDownLatch countDownLatch = new CountDownLatch(1);
MockIndexer indexer = new MockIndexer(executor, state, 2, countDownLatch);
MockIndexer indexer = new MockIndexer(executor, state, 2, countDownLatch, false);
indexer.start();
assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
@ -220,7 +235,8 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase {
countDownLatch.countDown();
assertThat(indexer.getPosition(), equalTo(2));
ESTestCase.awaitBusy(() -> isFinished.get());
assertTrue(awaitBusy(() -> isFinished.get()));
assertFalse(isStopped.get());
assertThat(indexer.getStep(), equalTo(6));
assertThat(indexer.getStats().getNumInvocations(), equalTo(1L));
assertThat(indexer.getStats().getNumPages(), equalTo(1L));
@ -234,18 +250,57 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase {
public void testStateMachineBrokenSearch() throws InterruptedException {
AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
final ExecutorService executor = Executors.newFixedThreadPool(1);
isFinished.set(false);
try {
MockIndexerThrowsFirstSearch indexer = new MockIndexerThrowsFirstSearch(executor, state, 2);
indexer.start();
assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
assertTrue(ESTestCase.awaitBusy(() -> isFinished.get(), 10000, TimeUnit.SECONDS));
assertTrue(awaitBusy(() -> isFinished.get(), 10000, TimeUnit.SECONDS));
assertThat(indexer.getStep(), equalTo(3));
} finally {
executor.shutdownNow();
}
}
public void testStop_AfterIndexerIsFinished() throws InterruptedException {
AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
final ExecutorService executor = Executors.newFixedThreadPool(1);
try {
CountDownLatch countDownLatch = new CountDownLatch(1);
MockIndexer indexer = new MockIndexer(executor, state, 2, countDownLatch, false);
indexer.start();
assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
countDownLatch.countDown();
assertTrue(awaitBusy(() -> isFinished.get()));
indexer.stop();
assertTrue(isStopped.get());
assertThat(indexer.getState(), equalTo(IndexerState.STOPPED));
} finally {
executor.shutdownNow();
}
}
public void testStop_WhileIndexing() throws InterruptedException {
AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
final ExecutorService executor = Executors.newFixedThreadPool(1);
try {
CountDownLatch countDownLatch = new CountDownLatch(1);
MockIndexer indexer = new MockIndexer(executor, state, 2, countDownLatch, true);
indexer.start();
assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
assertThat(indexer.getState(), equalTo(IndexerState.INDEXING));
indexer.stop();
countDownLatch.countDown();
assertThat(indexer.getPosition(), equalTo(2));
assertTrue(awaitBusy(() -> isStopped.get()));
assertFalse(isFinished.get());
} finally {
executor.shutdownNow();
}
}
}

View File

@ -93,11 +93,11 @@ abstract class DataFrameIntegTestCase extends ESIntegTestCase {
new StartDataFrameTransformAction.Request(id, false)).actionGet();
}
protected DeleteDataFrameTransformAction.Response deleteDataFrameTransform(String id) {
DeleteDataFrameTransformAction.Response response = client().execute(DeleteDataFrameTransformAction.INSTANCE,
protected AcknowledgedResponse deleteDataFrameTransform(String id) {
AcknowledgedResponse response = client().execute(DeleteDataFrameTransformAction.INSTANCE,
new DeleteDataFrameTransformAction.Request(id))
.actionGet();
if (response.isDeleted()) {
if (response.isAcknowledged()) {
transformConfigs.remove(id);
}
return response;

View File

@ -21,6 +21,7 @@ import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex;
import org.junit.After;
import org.junit.AfterClass;
import java.io.IOException;
@ -278,16 +279,20 @@ public abstract class DataFrameRestTestCase extends ESRestTestCase {
adminClient().performRequest(request);
}
@AfterClass
public static void removeIndices() throws Exception {
@After
public void waitForDataFrame() throws Exception {
wipeDataFrameTransforms();
waitForPendingDataFrameTasks();
}
@AfterClass
public static void removeIndices() throws Exception {
// we might have disabled wiping indices, but now its time to get rid of them
// note: can not use super.cleanUpCluster() as this method must be static
wipeIndices();
}
protected static void wipeDataFrameTransforms() throws IOException, InterruptedException {
public void wipeDataFrameTransforms() throws IOException, InterruptedException {
List<Map<String, Object>> transformConfigs = getDataFrameTransforms();
for (Map<String, Object> transformConfig : transformConfigs) {
String transformId = (String) transformConfig.get("id");

View File

@ -5,93 +5,73 @@
*/
package org.elasticsearch.xpack.dataframe.action;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.tasks.TransportTasksAction;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.discovery.MasterNotDiscoveredException;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.dataframe.action.DeleteDataFrameTransformAction;
import org.elasticsearch.xpack.core.dataframe.action.DeleteDataFrameTransformAction.Request;
import org.elasticsearch.xpack.core.dataframe.action.DeleteDataFrameTransformAction.Response;
import org.elasticsearch.xpack.core.indexing.IndexerState;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
import org.elasticsearch.xpack.dataframe.transforms.DataFrameTransformTask;
import java.util.List;
import java.io.IOException;
public class TransportDeleteDataFrameTransformAction extends TransportTasksAction<DataFrameTransformTask, Request, Response, Response> {
public class TransportDeleteDataFrameTransformAction extends TransportMasterNodeAction<Request, AcknowledgedResponse> {
private final DataFrameTransformsConfigManager transformsConfigManager;
@Inject
public TransportDeleteDataFrameTransformAction(TransportService transportService, ActionFilters actionFilters,
ClusterService clusterService, DataFrameTransformsConfigManager transformsConfigManager) {
super(DeleteDataFrameTransformAction.NAME, clusterService, transportService, actionFilters, Request::new, Response::new,
Response::new, ThreadPool.Names.SAME);
public TransportDeleteDataFrameTransformAction(TransportService transportService, ActionFilters actionFilters, ThreadPool threadPool,
ClusterService clusterService, IndexNameExpressionResolver indexNameExpressionResolver,
DataFrameTransformsConfigManager transformsConfigManager) {
super(DeleteDataFrameTransformAction.NAME, transportService, clusterService, threadPool, actionFilters,
Request::new, indexNameExpressionResolver);
this.transformsConfigManager = transformsConfigManager;
}
@Override
protected Response newResponse(Request request, List<Response> tasks, List<TaskOperationFailure> taskOperationFailures,
List<FailedNodeException> failedNodeExceptions) {
assert tasks.size() + taskOperationFailures.size() == 1;
boolean cancelled = tasks.size() > 0 && tasks.stream().allMatch(Response::isDeleted);
return new Response(cancelled, taskOperationFailures, failedNodeExceptions);
protected String executor() {
return ThreadPool.Names.SAME;
}
@Override
protected void taskOperation(Request request, DataFrameTransformTask task, ActionListener<Response> listener) {
assert task.getTransformId().equals(request.getId());
IndexerState state = task.getState().getIndexerState();
if (state.equals(IndexerState.STOPPED)) {
task.onCancelled();
transformsConfigManager.deleteTransform(request.getId(), ActionListener.wrap(r -> {
listener.onResponse(new Response(true));
}, listener::onFailure));
protected AcknowledgedResponse newResponse() {
return new AcknowledgedResponse();
}
protected AcknowledgedResponse read(StreamInput in) throws IOException {
AcknowledgedResponse response = new AcknowledgedResponse();
response.readFrom(in);
return response;
}
@Override
protected void masterOperation(Request request, ClusterState state, ActionListener<AcknowledgedResponse> listener) throws Exception {
PersistentTasksCustomMetaData pTasksMeta = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
if (pTasksMeta != null && pTasksMeta.getTask(request.getId()) != null) {
listener.onFailure(new ElasticsearchStatusException("Cannot delete data frame [" + request.getId() +
"] as the task is running. Stop the task first", RestStatus.CONFLICT));
} else {
listener.onFailure(new IllegalStateException("Could not delete transform [" + request.getId() + "] because "
+ "indexer state is [" + state + "]. Transform must be [" + IndexerState.STOPPED + "] before deletion."));
// Task is not running, delete the configuration document
transformsConfigManager.deleteTransform(request.getId(), ActionListener.wrap(
r -> listener.onResponse(new AcknowledgedResponse(r)),
listener::onFailure));
}
}
@Override
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
final ClusterState state = clusterService.state();
final DiscoveryNodes nodes = state.nodes();
if (nodes.isLocalNodeElectedMaster()) {
PersistentTasksCustomMetaData pTasksMeta = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
if (pTasksMeta != null && pTasksMeta.getTask(request.getId()) != null) {
super.doExecute(task, request, listener);
} else {
// we couldn't find the transform in the persistent task CS, but maybe the transform exists in the configuration index,
// if so delete the orphaned document and do not throw (for the normal case we want to stop the task first,
// than delete the configuration document if and only if the data frame transform is in stopped state)
transformsConfigManager.deleteTransform(request.getId(), ActionListener.wrap(r -> {
listener.onResponse(new Response(true));
return;
}, listener::onFailure));
}
} else {
// Delegates DeleteTransform to elected master node, so it becomes the coordinating node.
// Non-master nodes may have a stale cluster state that shows transforms which are cancelled
// on the master, which makes testing difficult.
if (nodes.getMasterNode() == null) {
listener.onFailure(new MasterNotDiscoveredException("no known master nodes"));
} else {
transportService.sendRequest(nodes.getMasterNode(), actionName, request,
new ActionListenerResponseHandler<>(listener, Response::new));
}
}
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
}
}

View File

@ -132,7 +132,6 @@ public class TransportGetDataFrameTransformsStatsAction extends
},
e -> {
// If the index to search, or the individual config is not there, just return empty
logger.error("failed to expand ids", e);
if (e instanceof ResourceNotFoundException) {
finalListener.onResponse(new Response(Collections.emptyList()));
} else {

View File

@ -6,8 +6,6 @@
package org.elasticsearch.xpack.dataframe.action;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.action.ActionListener;
@ -63,8 +61,6 @@ import java.util.stream.Collectors;
public class TransportPutDataFrameTransformAction
extends TransportMasterNodeAction<PutDataFrameTransformAction.Request, AcknowledgedResponse> {
private static final Logger logger = LogManager.getLogger(TransportPutDataFrameTransformAction.class);
private final XPackLicenseState licenseState;
private final Client client;
private final DataFrameTransformsConfigManager dataFrameTransformsConfigManager;

View File

@ -5,64 +5,85 @@
*/
package org.elasticsearch.xpack.dataframe.action;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.tasks.TransportTasksAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.MasterNotDiscoveredException;
import org.elasticsearch.persistent.PersistentTasksService;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.action.util.PageParams;
import org.elasticsearch.xpack.core.dataframe.DataFrameMessages;
import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
import org.elasticsearch.xpack.dataframe.transforms.DataFrameTransformTask;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
public class TransportStopDataFrameTransformAction extends
TransportTasksAction<DataFrameTransformTask, StopDataFrameTransformAction.Request,
StopDataFrameTransformAction.Response, StopDataFrameTransformAction.Response> {
private static final TimeValue WAIT_FOR_COMPLETION_POLL = timeValueMillis(100);
private final ThreadPool threadPool;
private final DataFrameTransformsConfigManager dataFrameTransformsConfigManager;
private final PersistentTasksService persistentTasksService;
@Inject
public TransportStopDataFrameTransformAction(TransportService transportService, ActionFilters actionFilters,
ClusterService clusterService, ThreadPool threadPool,
PersistentTasksService persistentTasksService,
DataFrameTransformsConfigManager dataFrameTransformsConfigManager) {
super(StopDataFrameTransformAction.NAME, clusterService, transportService, actionFilters, StopDataFrameTransformAction.Request::new,
StopDataFrameTransformAction.Response::new, StopDataFrameTransformAction.Response::new, ThreadPool.Names.SAME);
this.threadPool = threadPool;
this.dataFrameTransformsConfigManager = dataFrameTransformsConfigManager;
this.persistentTasksService = persistentTasksService;
}
@Override
protected void doExecute(Task task, StopDataFrameTransformAction.Request request,
ActionListener<StopDataFrameTransformAction.Response> listener) {
final ClusterState state = clusterService.state();
final DiscoveryNodes nodes = state.nodes();
if (nodes.isLocalNodeElectedMaster() == false) {
// Delegates stop data frame to elected master node so it becomes the coordinating node.
if (nodes.getMasterNode() == null) {
listener.onFailure(new MasterNotDiscoveredException("no known master node"));
} else {
transportService.sendRequest(nodes.getMasterNode(), actionName, request,
new ActionListenerResponseHandler<>(listener, StopDataFrameTransformAction.Response::new));
}
} else {
final ActionListener<StopDataFrameTransformAction.Response> finalListener;
if (request.waitForCompletion()) {
finalListener = waitForStopListener(request, listener);
} else {
finalListener = listener;
}
dataFrameTransformsConfigManager.expandTransformIds(request.getId(), new PageParams(0, 10_000), ActionListener.wrap(
expandedIds -> {
request.setExpandedIds(new HashSet<>(expandedIds));
request.setNodes(DataFrameNodes.dataFrameTaskNodes(expandedIds, clusterService.state()));
super.doExecute(task, request, listener);
},
listener::onFailure
));
dataFrameTransformsConfigManager.expandTransformIds(request.getId(), new PageParams(0, 10_000), ActionListener.wrap(
expandedIds -> {
request.setExpandedIds(new HashSet<>(expandedIds));
request.setNodes(DataFrameNodes.dataFrameTaskNodes(expandedIds, clusterService.state()));
super.doExecute(task, request, finalListener);
},
listener::onFailure
));
}
}
@Override
@ -84,42 +105,9 @@ public class TransportStopDataFrameTransformAction extends
RestStatus.CONFLICT));
return;
}
if (request.waitForCompletion() == false) {
transformTask.stop(listener);
} else {
ActionListener<StopDataFrameTransformAction.Response> blockingListener = ActionListener.wrap(response -> {
if (response.isStopped()) {
// The Task acknowledged that it is stopped/stopping... wait until the status actually
// changes over before returning. Switch over to Generic threadpool so
// we don't block the network thread
threadPool.generic().execute(() -> {
try {
long untilInNanos = System.nanoTime() + request.getTimeout().getNanos();
while (System.nanoTime() - untilInNanos < 0) {
if (transformTask.isStopped()) {
listener.onResponse(response);
return;
}
Thread.sleep(WAIT_FOR_COMPLETION_POLL.millis());
}
// ran out of time
listener.onFailure(new ElasticsearchTimeoutException(
DataFrameMessages.getMessage(DataFrameMessages.REST_STOP_TRANSFORM_WAIT_FOR_COMPLETION_TIMEOUT,
request.getTimeout().getStringRep(), request.getId())));
} catch (InterruptedException e) {
listener.onFailure(new ElasticsearchException(DataFrameMessages.getMessage(
DataFrameMessages.REST_STOP_TRANSFORM_WAIT_FOR_COMPLETION_INTERRUPT, request.getId()), e));
}
});
} else {
// Did not acknowledge stop, just return the response
listener.onResponse(response);
}
}, listener::onFailure);
transformTask.stop(blockingListener);
}
transformTask.stop();
listener.onResponse(new StopDataFrameTransformAction.Response(Boolean.TRUE));
} else {
listener.onFailure(new RuntimeException("ID of data frame indexer task [" + transformTask.getTransformId()
+ "] does not match request's ID [" + request.getId() + "]"));
@ -139,4 +127,47 @@ public class TransportStopDataFrameTransformAction extends
boolean allStopped = tasks.stream().allMatch(StopDataFrameTransformAction.Response::isStopped);
return new StopDataFrameTransformAction.Response(allStopped);
}
private ActionListener<StopDataFrameTransformAction.Response>
waitForStopListener(StopDataFrameTransformAction.Request request,
ActionListener<StopDataFrameTransformAction.Response> listener) {
return ActionListener.wrap(
response -> {
// Wait until the persistent task is stopped
// Switch over to Generic threadpool so we don't block the network thread
threadPool.generic().execute(() ->
waitForDataFrameStopped(request.getExpandedIds(), request.getTimeout(), listener));
},
listener::onFailure
);
}
private void waitForDataFrameStopped(Collection<String> persistentTaskIds, TimeValue timeout,
ActionListener<StopDataFrameTransformAction.Response> listener) {
persistentTasksService.waitForPersistentTasksCondition(persistentTasksCustomMetaData -> {
if (persistentTasksCustomMetaData == null) {
return true;
}
for (String persistentTaskId : persistentTaskIds) {
if (persistentTasksCustomMetaData.getTask(persistentTaskId) != null) {
return false;
}
}
return true;
}, timeout, new ActionListener<Boolean>() {
@Override
public void onResponse(Boolean result) {
listener.onResponse(new StopDataFrameTransformAction.Response(Boolean.TRUE));
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
}
}

View File

@ -11,6 +11,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.dataframe.action.DeleteDataFrameTransformAction;
@ -33,7 +34,7 @@ public class RestDeleteDataFrameTransformAction extends BaseRestHandler {
DeleteDataFrameTransformAction.Request request = new DeleteDataFrameTransformAction.Request(id);
return channel -> client.execute(DeleteDataFrameTransformAction.INSTANCE, request,
new BaseTasksResponseToXContentListener<>(channel));
new RestToXContentListener<>(channel));
}
@Override

View File

@ -27,7 +27,6 @@ import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction;
import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction.Response;
import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
@ -86,7 +85,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
String initialReason = null;
long initialGeneration = 0;
Map<String, Object> initialPosition = null;
logger.info("[{}] init, got state: [{}]", transform.getId(), state != null);
logger.trace("[{}] init, got state: [{}]", transform.getId(), state != null);
if (state != null) {
initialTaskState = state.getTaskState();
initialReason = state.getReason();
@ -219,51 +218,17 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
));
}
public synchronized void stop(ActionListener<StopDataFrameTransformAction.Response> listener) {
public synchronized void stop() {
if (getIndexer() == null) {
listener.onFailure(new ElasticsearchException("Task for transform [{}] not fully initialized. Try again later",
getTransformId()));
return;
}
// taskState is initialized as STOPPED and is updated in tandem with the indexerState
// Consequently, if it is STOPPED, we consider the whole task STOPPED.
if (taskState.get() == DataFrameTransformTaskState.STOPPED) {
listener.onResponse(new StopDataFrameTransformAction.Response(true));
return;
}
final IndexerState newState = getIndexer().stop();
switch (newState) {
case STOPPED:
// Fall through to `STOPPING` as the behavior is the same for both, we should persist for both
case STOPPING:
// update the persistent state to STOPPED. There are two scenarios and both are safe:
// 1. we persist STOPPED now, indexer continues a bit then sees the flag and checkpoints another STOPPED with the more recent
// position.
// 2. we persist STOPPED now, indexer continues a bit but then dies. When/if we resume we'll pick up at last checkpoint,
// overwrite some docs and eventually checkpoint.
taskState.set(DataFrameTransformTaskState.STOPPED);
DataFrameTransformState state = new DataFrameTransformState(
DataFrameTransformTaskState.STOPPED,
IndexerState.STOPPED,
getIndexer().getPosition(),
currentCheckpoint.get(),
stateReason.get(),
getIndexer().getProgress());
persistStateToClusterState(state, ActionListener.wrap(
task -> {
auditor.info(transform.getId(), "Updated state to [" + state.getTaskState() + "]");
listener.onResponse(new StopDataFrameTransformAction.Response(true));
},
exc -> listener.onFailure(new ElasticsearchException(
"Error while updating state for data frame transform [{}] to [{}]", exc,
transform.getId(),
state.getIndexerState()))));
break;
default:
listener.onFailure(new ElasticsearchException("Cannot stop task for data frame transform [{}], because state was [{}]",
transform.getId(), newState));
break;
}
getIndexer().stop();
}
@Override
@ -281,12 +246,10 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
/**
* Attempt to gracefully cleanup the data frame transform so it can be terminated.
* This tries to remove the job from the scheduler, and potentially any other
* cleanup operations in the future
* This tries to remove the job from the scheduler and completes the persistent task
*/
synchronized void shutdown() {
try {
logger.info("Data frame indexer [" + transform.getId() + "] received abort request, stopping indexer.");
schedulerEngine.remove(SCHEDULE_NAME + "_" + transform.getId());
schedulerEngine.unregister(this);
} catch (Exception e) {
@ -613,6 +576,13 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
}
}
@Override
protected void onStop() {
auditor.info(transformConfig.getId(), "Indexer has stopped");
logger.info("Data frame transform [{}] indexer has stopped", transformConfig.getId());
transformTask.shutdown();
}
@Override
protected void onAbort() {
auditor.info(transformConfig.getId(), "Received abort request, stopping indexer");

View File

@ -106,6 +106,7 @@ teardown:
- do:
data_frame.stop_data_frame_transform:
transform_id: "airline-transform-start-stop"
wait_for_completion: true
- match: { stopped: true }
- do:
@ -199,6 +200,7 @@ teardown:
- do:
data_frame.stop_data_frame_transform:
transform_id: "airline-transform-start-later"
wait_for_completion: true
- match: { stopped: true }
- do:
@ -232,6 +234,8 @@ teardown:
- do:
data_frame.stop_data_frame_transform:
transform_id: "_all"
wait_for_completion: true
- match: { stopped: true }
- do: