[Transform] Improve force stop robustness in case of an error (#51072)

If a transform config got lost (e.g. because the internal index disappeared) tasks could not be
stopped using transform API. This change makes it possible to stop transforms without a config,
meaning to remove the background task. In order to do so force must be set to true.
This commit is contained in:
Hendrik Muhs 2020-01-17 07:40:02 +01:00
parent fac509836a
commit 13343b15c9
6 changed files with 238 additions and 78 deletions

View File

@ -17,6 +17,8 @@ public class TransformMessages {
"Interrupted while waiting for transform [{0}] to stop";
public static final String REST_PUT_TRANSFORM_EXISTS = "Transform with id [{0}] already exists";
public static final String REST_UNKNOWN_TRANSFORM = "Transform with id [{0}] could not be found";
public static final String REST_STOP_TRANSFORM_WITHOUT_CONFIG =
"Detected transforms with no config [{0}]. Use force to stop/delete them.";
public static final String REST_PUT_TRANSFORM_FAILED_TO_VALIDATE_CONFIGURATION =
"Failed to validate configuration";
public static final String REST_PUT_FAILED_PERSIST_TRANSFORM_CONFIGURATION = "Failed to persist transform configuration";

View File

@ -918,12 +918,4 @@ public class TransformPivotRestIT extends TransformRestTestCase {
assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_26", 3.918918918);
deleteIndex(indexName);
}
private void assertOnePivotValue(String query, double expected) throws IOException {
Map<String, Object> searchResult = getAsMap(query);
assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));
double actual = (Double) ((List<?>) XContentMapValues.extractValue("hits.hits._source.avg_rating", searchResult)).get(0);
assertEquals(expected, actual, 0.000001);
}
}

View File

@ -73,7 +73,7 @@ public abstract class TransformRestTestCase extends ESRestTestCase {
}
protected void createReviewsIndex(String indexName, int numDocs) throws IOException {
int[] distributionTable = {5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 4, 4, 4, 3, 3, 2, 1, 1, 1};
int[] distributionTable = { 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 4, 4, 4, 3, 3, 2, 1, 1, 1 };
// create mapping
try (XContentBuilder builder = jsonBuilder()) {
@ -158,6 +158,7 @@ public abstract class TransformRestTestCase extends ESRestTestCase {
bulkRequest.setJsonEntity(bulk.toString());
client().performRequest(bulkRequest);
}
/**
* Create a simple dataset for testing with reviewers, ratings and businesses
*/
@ -182,9 +183,8 @@ public abstract class TransformRestTestCase extends ESRestTestCase {
final Request createDataframeTransformRequest = createRequestWithAuth("PUT", getTransformEndpoint() + transformId, authHeader);
String config = "{ \"dest\": {\"index\":\"" + dataFrameIndex + "\"},"
+ " \"source\": {\"index\":\"" + REVIEWS_INDEX_NAME + "\"},"
//Set frequency high for testing
String config = "{ \"dest\": {\"index\":\"" + dataFrameIndex + "\"}," + " \"source\": {\"index\":\"" + REVIEWS_INDEX_NAME + "\"},"
// Set frequency high for testing
+ " \"sync\": {\"time\":{\"field\": \"timestamp\", \"delay\": \"15m\"}},"
+ " \"frequency\": \"1s\","
+ " \"pivot\": {"
@ -206,7 +206,6 @@ public abstract class TransformRestTestCase extends ESRestTestCase {
assertThat(createDataframeTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
}
protected void createPivotReviewsTransform(String transformId, String dataFrameIndex, String query, String pipeline, String authHeader)
throws IOException {
final Request createDataframeTransformRequest = createRequestWithAuth("PUT", getTransformEndpoint() + transformId, authHeader);
@ -226,18 +225,18 @@ public abstract class TransformRestTestCase extends ESRestTestCase {
}
config += " \"pivot\": {"
+ " \"group_by\": {"
+ " \"reviewer\": {"
+ " \"terms\": {"
+ " \"field\": \"user_id\""
+ " } } },"
+ " \"aggregations\": {"
+ " \"avg_rating\": {"
+ " \"avg\": {"
+ " \"field\": \"stars\""
+ " } } } },"
+ "\"frequency\":\"1s\""
+ "}";
+ " \"group_by\": {"
+ " \"reviewer\": {"
+ " \"terms\": {"
+ " \"field\": \"user_id\""
+ " } } },"
+ " \"aggregations\": {"
+ " \"avg_rating\": {"
+ " \"avg\": {"
+ " \"field\": \"stars\""
+ " } } } },"
+ "\"frequency\":\"1s\""
+ "}";
createDataframeTransformRequest.setJsonEntity(config);
@ -245,11 +244,11 @@ public abstract class TransformRestTestCase extends ESRestTestCase {
assertThat(createDataframeTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
}
protected void startDataframeTransform(String transformId) throws IOException {
startDataframeTransform(transformId, null);
protected void startTransform(String transformId) throws IOException {
startTransform(transformId, null);
}
protected void startDataframeTransform(String transformId, String authHeader, String... warnings) throws IOException {
protected void startTransform(String transformId, String authHeader, String... warnings) throws IOException {
// start the transform
final Request startTransformRequest = createRequestWithAuth("POST", getTransformEndpoint() + transformId + "/_start", authHeader);
if (warnings.length > 0) {
@ -280,10 +279,10 @@ public abstract class TransformRestTestCase extends ESRestTestCase {
startAndWaitForTransform(transformId, dataFrameIndex, authHeader, new String[0]);
}
protected void startAndWaitForTransform(String transformId, String dataFrameIndex,
String authHeader, String... warnings) throws Exception {
protected void startAndWaitForTransform(String transformId, String dataFrameIndex, String authHeader, String... warnings)
throws Exception {
// start the transform
startDataframeTransform(transformId, authHeader, warnings);
startTransform(transformId, authHeader, warnings);
assertTrue(indexExists(dataFrameIndex));
// wait until the dataframe has been created and all data is available
waitForDataFrameCheckpoint(transformId);
@ -292,18 +291,14 @@ public abstract class TransformRestTestCase extends ESRestTestCase {
refreshIndex(dataFrameIndex);
}
protected void startAndWaitForContinuousTransform(String transformId,
String dataFrameIndex,
String authHeader) throws Exception {
protected void startAndWaitForContinuousTransform(String transformId, String dataFrameIndex, String authHeader) throws Exception {
startAndWaitForContinuousTransform(transformId, dataFrameIndex, authHeader, 1L);
}
protected void startAndWaitForContinuousTransform(String transformId,
String dataFrameIndex,
String authHeader,
long checkpoint) throws Exception {
protected void startAndWaitForContinuousTransform(String transformId, String dataFrameIndex, String authHeader, long checkpoint)
throws Exception {
// start the transform
startDataframeTransform(transformId, authHeader, new String[0]);
startTransform(transformId, authHeader, new String[0]);
assertTrue(indexExists(dataFrameIndex));
// wait until the dataframe has been created and all data is available
waitForTransformCheckpoint(transformId, checkpoint);
@ -323,9 +318,7 @@ public abstract class TransformRestTestCase extends ESRestTestCase {
}
void waitForDataFrameStopped(String transformId) throws Exception {
assertBusy(() -> {
assertEquals("stopped", getDataFrameTransformState(transformId));
}, 15, TimeUnit.SECONDS);
assertBusy(() -> { assertEquals("stopped", getTransformState(transformId)); }, 15, TimeUnit.SECONDS);
}
void waitForDataFrameCheckpoint(String transformId) throws Exception {
@ -341,7 +334,7 @@ public abstract class TransformRestTestCase extends ESRestTestCase {
}
@SuppressWarnings("unchecked")
private static List<Map<String, Object>> getDataFrameTransforms() throws IOException {
protected static List<Map<String, Object>> getTransforms() throws IOException {
Response response = adminClient().performRequest(new Request("GET", getTransformEndpoint() + "_all"));
Map<String, Object> transforms = entityAsMap(response);
List<Map<String, Object>> transformConfigs = (List<Map<String, Object>>) XContentMapValues.extractValue("transforms", transforms);
@ -349,12 +342,12 @@ public abstract class TransformRestTestCase extends ESRestTestCase {
return transformConfigs == null ? Collections.emptyList() : transformConfigs;
}
protected static String getDataFrameTransformState(String transformId) throws IOException {
Map<?, ?> transformStatsAsMap = getDataFrameState(transformId);
protected static String getTransformState(String transformId) throws IOException {
Map<?, ?> transformStatsAsMap = getTransformStateAndStats(transformId);
return transformStatsAsMap == null ? null : (String) XContentMapValues.extractValue("state", transformStatsAsMap);
}
protected static Map<?, ?> getDataFrameState(String transformId) throws IOException {
protected static Map<?, ?> getTransformStateAndStats(String transformId) throws IOException {
Response statsResponse = client().performRequest(new Request("GET", getTransformEndpoint() + transformId + "/_stats"));
List<?> transforms = ((List<?>) entityAsMap(statsResponse).get("transforms"));
if (transforms.isEmpty()) {
@ -383,7 +376,7 @@ public abstract class TransformRestTestCase extends ESRestTestCase {
}
public void wipeTransforms() throws IOException {
List<Map<String, Object>> transformConfigs = getDataFrameTransforms();
List<Map<String, Object>> transformConfigs = getTransforms();
for (Map<String, Object> transformConfig : transformConfigs) {
String transformId = (String) transformConfig.get("id");
Request request = new Request("POST", getTransformEndpoint() + transformId + "/_stop");
@ -395,7 +388,7 @@ public abstract class TransformRestTestCase extends ESRestTestCase {
for (Map<String, Object> transformConfig : transformConfigs) {
String transformId = (String) transformConfig.get("id");
String state = getDataFrameTransformState(transformId);
String state = getTransformState(transformId);
assertEquals("Transform [" + transformId + "] is not in the stopped state", "stopped", state);
}
@ -405,7 +398,7 @@ public abstract class TransformRestTestCase extends ESRestTestCase {
}
// transforms should be all gone
transformConfigs = getDataFrameTransforms();
transformConfigs = getTransforms();
assertTrue(transformConfigs.isEmpty());
// the configuration index should be empty
@ -437,11 +430,15 @@ public abstract class TransformRestTestCase extends ESRestTestCase {
protected void setupDataAccessRole(String role, String... indices) throws IOException {
String indicesStr = Arrays.stream(indices).collect(Collectors.joining("\",\"", "\"", "\""));
Request request = new Request("PUT", "/_security/role/" + role);
request.setJsonEntity("{"
+ " \"indices\" : ["
+ " { \"names\": [" + indicesStr + "], \"privileges\": [\"create_index\", \"read\", \"write\", \"view_index_metadata\"] }"
+ " ]"
+ "}");
request.setJsonEntity(
"{"
+ " \"indices\" : ["
+ " { \"names\": ["
+ indicesStr
+ "], \"privileges\": [\"create_index\", \"read\", \"write\", \"view_index_metadata\"] }"
+ " ]"
+ "}"
);
client().performRequest(request);
}
@ -450,13 +447,18 @@ public abstract class TransformRestTestCase extends ESRestTestCase {
String rolesStr = roles.stream().collect(Collectors.joining("\",\"", "\"", "\""));
Request request = new Request("PUT", "/_security/user/" + user);
request.setJsonEntity("{"
+ " \"password\" : \"" + password + "\","
+ " \"roles\" : [ " + rolesStr + " ]"
+ "}");
request.setJsonEntity("{" + " \"password\" : \"" + password + "\"," + " \"roles\" : [ " + rolesStr + " ]" + "}");
client().performRequest(request);
}
protected void assertOnePivotValue(String query, double expected) throws IOException {
Map<String, Object> searchResult = getAsMap(query);
assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));
double actual = (Double) ((List<?>) XContentMapValues.extractValue("hits.hits._source.avg_rating", searchResult)).get(0);
assertEquals(expected, actual, 0.000001);
}
protected static String getTransformEndpoint() {
return useDeprecatedEndpoints ? TransformField.REST_BASE_PATH_TRANSFORMS_DEPRECATED : TransformField.REST_BASE_PATH_TRANSFORMS;
}

View File

@ -0,0 +1,115 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.transform.integration;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.xpack.core.transform.TransformField;
import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants;
import java.io.IOException;
import java.util.Map;
import java.util.Map.Entry;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
public class TransformRobustnessIT extends TransformRestTestCase {
public void testTaskRemovalAfterInternalIndexGotDeleted() throws Exception {
String indexName = "continuous_reviews";
createReviewsIndex(indexName);
String transformId = "simple_continuous_pivot";
String transformIndex = "pivot_reviews_continuous";
final Request createTransformRequest = new Request("PUT", TransformField.REST_BASE_PATH_TRANSFORMS + transformId);
String config = "{"
+ " \"source\": {\"index\":\""
+ indexName
+ "\"},"
+ " \"dest\": {\"index\":\""
+ transformIndex
+ "\"},"
+ " \"frequency\": \"1s\","
+ " \"sync\": {\"time\": {\"field\": \"timestamp\", \"delay\": \"1s\"}},"
+ " \"pivot\": {"
+ " \"group_by\": {"
+ " \"reviewer\": {"
+ " \"terms\": {"
+ " \"field\": \"user_id\""
+ " } } },"
+ " \"aggregations\": {"
+ " \"avg_rating\": {"
+ " \"avg\": {"
+ " \"field\": \"stars\""
+ " } } } }"
+ "}";
createTransformRequest.setJsonEntity(config);
Map<String, Object> createTransformResponse = entityAsMap(client().performRequest(createTransformRequest));
assertThat(createTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
assertEquals(1, getTransforms().size());
// there shouldn't be a task yet
assertEquals(0, getNumberOfTransformTasks());
startAndWaitForContinuousTransform(transformId, transformIndex, null);
assertTrue(indexExists(transformIndex));
// a task exists
assertEquals(1, getNumberOfTransformTasks());
// get and check some users
assertOnePivotValue(transformIndex + "/_search?q=reviewer:user_0", 3.776978417);
assertOnePivotValue(transformIndex + "/_search?q=reviewer:user_5", 3.72);
assertNotNull(getTransformState(transformId));
assertEquals(1, getTransforms().size());
// delete the transform index
beEvilAndDeleteTheTransformIndex();
// transform is gone
assertEquals(0, getTransforms().size());
// but the task is still there
assertEquals(1, getNumberOfTransformTasks());
Request stopTransformRequest = new Request("POST", TransformField.REST_BASE_PATH_TRANSFORMS + transformId + "/_stop");
ResponseException e = expectThrows(ResponseException.class, () -> client().performRequest(stopTransformRequest));
assertEquals(409, e.getResponse().getStatusLine().getStatusCode());
assertThat(
e.getMessage(),
containsString("Detected transforms with no config [" + transformId + "]. Use force to stop/delete them.")
);
stopTransformRequest.addParameter(TransformField.FORCE.getPreferredName(), Boolean.toString(true));
Map<String, Object> stopTransformResponse = entityAsMap(client().performRequest(stopTransformRequest));
assertThat(stopTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
// the task is gone
assertEquals(1, getNumberOfTransformTasks());
}
@SuppressWarnings("unchecked")
private int getNumberOfTransformTasks() throws IOException {
final Request tasksRequest = new Request("GET", "/_tasks");
tasksRequest.addParameter("actions", TransformField.TASK_NAME + "*");
Map<String, Object> tasksResponse = entityAsMap(client().performRequest(tasksRequest));
Map<String, Object> nodes = (Map<String, Object>) tasksResponse.get("nodes");
if (nodes == null) {
return 0;
}
int foundTasks = 0;
for (Entry<String, Object> node : nodes.entrySet()) {
Map<String, Object> nodeInfo = (Map<String, Object>) node.getValue();
Map<String, Object> tasks = (Map<String, Object>) nodeInfo.get("tasks");
foundTasks += tasks != null ? tasks.size() : 0;
}
return foundTasks;
}
private void beEvilAndDeleteTheTransformIndex() throws IOException {
adminClient().performRequest(new Request("DELETE", TransformInternalIndexConstants.LATEST_INDEX_NAME));
}
}

View File

@ -40,16 +40,15 @@ public class TransformTaskFailedStateIT extends TransformRestTestCase {
// Set logging level to trace
// see: https://github.com/elastic/elasticsearch/issues/45562
Request addFailureRetrySetting = new Request("PUT", "/_cluster/settings");
addFailureRetrySetting
.setJsonEntity(
"{\"transient\": {\"xpack.transform.num_transform_failure_retries\": \""
+ 0
+ "\","
+ "\"logger.org.elasticsearch.action.bulk\": \"info\","
+ // reduces bulk failure spam
"\"logger.org.elasticsearch.xpack.core.indexing.AsyncTwoPhaseIndexer\": \"trace\","
+ "\"logger.org.elasticsearch.xpack.transform\": \"trace\"}}"
);
addFailureRetrySetting.setJsonEntity(
"{\"transient\": {\"xpack.transform.num_transform_failure_retries\": \""
+ 0
+ "\","
+ "\"logger.org.elasticsearch.action.bulk\": \"info\","
+ // reduces bulk failure spam
"\"logger.org.elasticsearch.xpack.core.indexing.AsyncTwoPhaseIndexer\": \"trace\","
+ "\"logger.org.elasticsearch.xpack.transform\": \"trace\"}}"
);
client().performRequest(addFailureRetrySetting);
}
@ -70,9 +69,9 @@ public class TransformTaskFailedStateIT extends TransformRestTestCase {
createDestinationIndexWithBadMapping(transformIndex);
createContinuousPivotReviewsTransform(transformId, transformIndex, null);
failureTransforms.add(transformId);
startDataframeTransform(transformId);
startTransform(transformId);
awaitState(transformId, TransformStats.State.FAILED);
Map<?, ?> fullState = getDataFrameState(transformId);
Map<?, ?> fullState = getTransformStateAndStats(transformId);
final String failureReason = "task encountered more than 0 failures; latest failure: "
+ ".*BulkIndexingException: Bulk index experienced failures. See the logs of the node running the transform for details.";
// Verify we have failed for the expected reason
@ -94,7 +93,7 @@ public class TransformTaskFailedStateIT extends TransformRestTestCase {
stopTransform(transformId, true);
awaitState(transformId, TransformStats.State.STOPPED);
fullState = getDataFrameState(transformId);
fullState = getTransformStateAndStats(transformId);
assertThat(XContentMapValues.extractValue("reason", fullState), is(nullValue()));
}
@ -105,9 +104,9 @@ public class TransformTaskFailedStateIT extends TransformRestTestCase {
createDestinationIndexWithBadMapping(dataFrameIndex);
createContinuousPivotReviewsTransform(transformId, dataFrameIndex, null);
failureTransforms.add(transformId);
startDataframeTransform(transformId);
startTransform(transformId);
awaitState(transformId, TransformStats.State.FAILED);
Map<?, ?> fullState = getDataFrameState(transformId);
Map<?, ?> fullState = getTransformStateAndStats(transformId);
final String failureReason = "task encountered more than 0 failures; latest failure: "
+ ".*BulkIndexingException: Bulk index experienced failures. See the logs of the node running the transform for details.";
// Verify we have failed for the expected reason
@ -119,7 +118,7 @@ public class TransformTaskFailedStateIT extends TransformRestTestCase {
+ "\\]. Use force stop and then restart the transform once error is resolved.";
// Verify that we cannot start the transform when the task is in a failed state
assertBusy(() -> {
ResponseException ex = expectThrows(ResponseException.class, () -> startDataframeTransform(transformId));
ResponseException ex = expectThrows(ResponseException.class, () -> startTransform(transformId));
assertThat(ex.getResponse().getStatusLine().getStatusCode(), equalTo(RestStatus.CONFLICT.getStatus()));
assertThat(
(String) XContentMapValues.extractValue("error.reason", entityAsMap(ex.getResponse())),
@ -132,7 +131,7 @@ public class TransformTaskFailedStateIT extends TransformRestTestCase {
private void awaitState(String transformId, TransformStats.State state) throws Exception {
assertBusy(() -> {
String currentState = getDataFrameTransformState(transformId);
String currentState = getTransformState(transformId);
assertThat(currentState, equalTo(state.value()));
}, 180, TimeUnit.SECONDS); // It should not take this long, but if the scheduler gets deferred, it could
}
@ -142,8 +141,7 @@ public class TransformTaskFailedStateIT extends TransformRestTestCase {
try (XContentBuilder builder = jsonBuilder()) {
builder.startObject();
{
builder
.startObject("mappings")
builder.startObject("mappings")
.startObject("properties")
.startObject("reviewer")
.field("type", "long")

View File

@ -9,6 +9,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.FailedNodeException;
@ -19,22 +20,28 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.LoggerMessageFormat;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.MasterNotDiscoveredException;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
import org.elasticsearch.persistent.PersistentTasksService;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.action.util.PageParams;
import org.elasticsearch.xpack.core.transform.TransformField;
import org.elasticsearch.xpack.core.transform.TransformMessages;
import org.elasticsearch.xpack.core.transform.action.StopTransformAction;
import org.elasticsearch.xpack.core.transform.action.StopTransformAction.Request;
import org.elasticsearch.xpack.core.transform.action.StopTransformAction.Response;
import org.elasticsearch.xpack.core.transform.transforms.TransformState;
import org.elasticsearch.xpack.core.transform.transforms.TransformTaskParams;
import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState;
import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants;
import org.elasticsearch.xpack.transform.TransformServices;
@ -47,6 +54,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@ -127,6 +135,25 @@ public class TransportStopTransformAction extends TransportTasksAction<Transform
}
}
static Tuple<Set<String>, Set<String>> findTasksWithoutConfig(ClusterState state, String transformId) {
PersistentTasksCustomMetaData tasks = state.metaData().custom(PersistentTasksCustomMetaData.TYPE);
Set<String> taskIds = new HashSet<>();
Set<String> executorNodes = new HashSet<>();
Predicate<PersistentTask<?>> taskMatcher = Strings.isAllOrWildcard(new String[] { transformId }) ? t -> true : t -> {
TransformTaskParams transformParams = (TransformTaskParams) t.getParams();
return Regex.simpleMatch(transformId, transformParams.getId());
};
for (PersistentTasksCustomMetaData.PersistentTask<?> pTask : tasks.findTasks(TransformField.TASK_NAME, taskMatcher)) {
executorNodes.add(pTask.getExecutorNode());
taskIds.add(pTask.getId());
}
return new Tuple<>(taskIds, executorNodes);
}
@Override
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
final ClusterState state = clusterService.state();
@ -160,7 +187,31 @@ public class TransportStopTransformAction extends TransportTasksAction<Transform
request.setExpandedIds(new HashSet<>(hitsAndIds.v2()));
request.setNodes(TransformNodes.transformTaskNodes(hitsAndIds.v2(), state));
super.doExecute(task, request, finalListener);
}, listener::onFailure)
}, e -> {
if (e instanceof ResourceNotFoundException) {
Tuple<Set<String>, Set<String>> runningTasksAndNodes = findTasksWithoutConfig(state, request.getId());
if (runningTasksAndNodes.v1().isEmpty()) {
listener.onFailure(e);
// found transforms without a config
} else if (request.isForce()) {
request.setExpandedIds(runningTasksAndNodes.v1());
request.setNodes(runningTasksAndNodes.v2().toArray(new String[0]));
super.doExecute(task, request, finalListener);
} else {
listener.onFailure(
new ElasticsearchStatusException(
TransformMessages.getMessage(
TransformMessages.REST_STOP_TRANSFORM_WITHOUT_CONFIG,
Strings.arrayToCommaDelimitedString(runningTasksAndNodes.v1().toArray(new String[0]))
),
RestStatus.CONFLICT
)
);
}
} else {
listener.onFailure(e);
}
})
);
}
}