mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-09 06:25:07 +00:00
* [ML][Transforms] remove `force` flag from _start (#46414) * [ML][Transforms] remove `force` flag from _start * fixing expected error message * adjusting bwc version
This commit is contained in:
parent
d1f47cf00e
commit
9cf9c64ec2
@ -209,7 +209,6 @@ import org.elasticsearch.xpack.core.transform.action.GetTransformsStatsAction;
|
|||||||
import org.elasticsearch.xpack.core.transform.action.PreviewTransformAction;
|
import org.elasticsearch.xpack.core.transform.action.PreviewTransformAction;
|
||||||
import org.elasticsearch.xpack.core.transform.action.PutTransformAction;
|
import org.elasticsearch.xpack.core.transform.action.PutTransformAction;
|
||||||
import org.elasticsearch.xpack.core.transform.action.StartTransformAction;
|
import org.elasticsearch.xpack.core.transform.action.StartTransformAction;
|
||||||
import org.elasticsearch.xpack.core.transform.action.StartTransformTaskAction;
|
|
||||||
import org.elasticsearch.xpack.core.transform.action.StopTransformAction;
|
import org.elasticsearch.xpack.core.transform.action.StopTransformAction;
|
||||||
import org.elasticsearch.xpack.core.transform.transforms.Transform;
|
import org.elasticsearch.xpack.core.transform.transforms.Transform;
|
||||||
import org.elasticsearch.xpack.core.transform.transforms.TransformState;
|
import org.elasticsearch.xpack.core.transform.transforms.TransformState;
|
||||||
@ -424,7 +423,6 @@ public class XPackClientPlugin extends Plugin implements ActionPlugin, NetworkPl
|
|||||||
// Data Frame
|
// Data Frame
|
||||||
PutTransformAction.INSTANCE,
|
PutTransformAction.INSTANCE,
|
||||||
StartTransformAction.INSTANCE,
|
StartTransformAction.INSTANCE,
|
||||||
StartTransformTaskAction.INSTANCE,
|
|
||||||
StopTransformAction.INSTANCE,
|
StopTransformAction.INSTANCE,
|
||||||
DeleteTransformAction.INSTANCE,
|
DeleteTransformAction.INSTANCE,
|
||||||
GetTransformsAction.INSTANCE,
|
GetTransformsAction.INSTANCE,
|
||||||
|
@ -36,7 +36,7 @@ public class TransformMessages {
|
|||||||
" Use force stop to stop the data frame transform.";
|
" Use force stop to stop the data frame transform.";
|
||||||
public static final String DATA_FRAME_CANNOT_START_FAILED_TRANSFORM =
|
public static final String DATA_FRAME_CANNOT_START_FAILED_TRANSFORM =
|
||||||
"Unable to start data frame transform [{0}] as it is in a failed state with failure: [{1}]. " +
|
"Unable to start data frame transform [{0}] as it is in a failed state with failure: [{1}]. " +
|
||||||
"Use force start to restart data frame transform once error is resolved.";
|
"Use force stop and then restart the data frame transform once error is resolved.";
|
||||||
|
|
||||||
public static final String FAILED_TO_CREATE_DESTINATION_INDEX = "Could not create destination index [{0}] for transform [{1}]";
|
public static final String FAILED_TO_CREATE_DESTINATION_INDEX = "Could not create destination index [{0}] for transform [{1}]";
|
||||||
public static final String FAILED_TO_RELOAD_TRANSFORM_CONFIGURATION =
|
public static final String FAILED_TO_RELOAD_TRANSFORM_CONFIGURATION =
|
||||||
|
@ -6,6 +6,7 @@
|
|||||||
|
|
||||||
package org.elasticsearch.xpack.core.transform.action;
|
package org.elasticsearch.xpack.core.transform.action;
|
||||||
|
|
||||||
|
import org.elasticsearch.Version;
|
||||||
import org.elasticsearch.action.ActionRequestValidationException;
|
import org.elasticsearch.action.ActionRequestValidationException;
|
||||||
import org.elasticsearch.action.ActionType;
|
import org.elasticsearch.action.ActionType;
|
||||||
import org.elasticsearch.action.support.master.AcknowledgedRequest;
|
import org.elasticsearch.action.support.master.AcknowledgedRequest;
|
||||||
@ -33,32 +34,30 @@ public class StartTransformAction extends ActionType<StartTransformAction.Respon
|
|||||||
public static class Request extends AcknowledgedRequest<Request> {
|
public static class Request extends AcknowledgedRequest<Request> {
|
||||||
|
|
||||||
private final String id;
|
private final String id;
|
||||||
private final boolean force;
|
|
||||||
|
|
||||||
public Request(String id, boolean force) {
|
public Request(String id) {
|
||||||
this.id = ExceptionsHelper.requireNonNull(id, TransformField.ID.getPreferredName());
|
this.id = ExceptionsHelper.requireNonNull(id, TransformField.ID.getPreferredName());
|
||||||
this.force = force;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public Request(StreamInput in) throws IOException {
|
public Request(StreamInput in) throws IOException {
|
||||||
super(in);
|
super(in);
|
||||||
id = in.readString();
|
id = in.readString();
|
||||||
force = in.readBoolean();
|
if(in.getVersion().before(Version.V_7_5_0)) {
|
||||||
|
in.readBoolean();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getId() {
|
public String getId() {
|
||||||
return id;
|
return id;
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isForce() {
|
|
||||||
return force;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void writeTo(StreamOutput out) throws IOException {
|
public void writeTo(StreamOutput out) throws IOException {
|
||||||
super.writeTo(out);
|
super.writeTo(out);
|
||||||
out.writeString(id);
|
out.writeString(id);
|
||||||
out.writeBoolean(force);
|
if(out.getVersion().before(Version.V_7_5_0)) {
|
||||||
|
out.writeBoolean(false);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -1,152 +0,0 @@
|
|||||||
/*
|
|
||||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
|
||||||
* or more contributor license agreements. Licensed under the Elastic License;
|
|
||||||
* you may not use this file except in compliance with the Elastic License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.elasticsearch.xpack.core.transform.action;
|
|
||||||
|
|
||||||
import org.elasticsearch.Version;
|
|
||||||
import org.elasticsearch.action.ActionRequestValidationException;
|
|
||||||
import org.elasticsearch.action.ActionType;
|
|
||||||
import org.elasticsearch.action.support.tasks.BaseTasksRequest;
|
|
||||||
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
|
|
||||||
import org.elasticsearch.common.io.stream.StreamInput;
|
|
||||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
|
||||||
import org.elasticsearch.common.xcontent.ToXContentObject;
|
|
||||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
|
||||||
import org.elasticsearch.tasks.Task;
|
|
||||||
import org.elasticsearch.xpack.core.transform.TransformField;
|
|
||||||
import org.elasticsearch.xpack.core.transform.utils.ExceptionsHelper;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.Objects;
|
|
||||||
|
|
||||||
public class StartTransformTaskAction extends ActionType<StartTransformTaskAction.Response> {
|
|
||||||
|
|
||||||
public static final StartTransformTaskAction INSTANCE = new StartTransformTaskAction();
|
|
||||||
public static final String NAME = "cluster:admin/data_frame/start_task";
|
|
||||||
|
|
||||||
private StartTransformTaskAction() {
|
|
||||||
super(NAME, StartTransformTaskAction.Response::new);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static class Request extends BaseTasksRequest<Request> {
|
|
||||||
|
|
||||||
private final String id;
|
|
||||||
private final boolean force;
|
|
||||||
|
|
||||||
public Request(String id, boolean force) {
|
|
||||||
this.id = ExceptionsHelper.requireNonNull(id, TransformField.ID.getPreferredName());
|
|
||||||
this.force = force;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Request(StreamInput in) throws IOException {
|
|
||||||
super(in);
|
|
||||||
id = in.readString();
|
|
||||||
if (in.getVersion().onOrAfter(Version.V_7_4_0)) {
|
|
||||||
force = in.readBoolean();
|
|
||||||
} else {
|
|
||||||
// The behavior before V_7_4_0 was that this flag did not exist,
|
|
||||||
// assuming previous checks allowed this task to be started.
|
|
||||||
force = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getId() {
|
|
||||||
return id;
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean isForce() {
|
|
||||||
return force;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void writeTo(StreamOutput out) throws IOException {
|
|
||||||
super.writeTo(out);
|
|
||||||
out.writeString(id);
|
|
||||||
if (out.getVersion().onOrAfter(Version.V_7_4_0)) {
|
|
||||||
out.writeBoolean(force);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean match(Task task) {
|
|
||||||
return task.getDescription().equals(TransformField.PERSISTENT_TASK_DESCRIPTION_PREFIX + id);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ActionRequestValidationException validate() {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int hashCode() {
|
|
||||||
return Objects.hash(id, force);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean equals(Object obj) {
|
|
||||||
if (obj == null) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
if (getClass() != obj.getClass()) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
Request other = (Request) obj;
|
|
||||||
return Objects.equals(id, other.id) && force == other.force;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static class Response extends BaseTasksResponse implements ToXContentObject {
|
|
||||||
private final boolean started;
|
|
||||||
|
|
||||||
public Response(StreamInput in) throws IOException {
|
|
||||||
super(in);
|
|
||||||
started = in.readBoolean();
|
|
||||||
}
|
|
||||||
|
|
||||||
public Response(boolean started) {
|
|
||||||
super(Collections.emptyList(), Collections.emptyList());
|
|
||||||
this.started = started;
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean isStarted() {
|
|
||||||
return started;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void writeTo(StreamOutput out) throws IOException {
|
|
||||||
super.writeTo(out);
|
|
||||||
out.writeBoolean(started);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
|
||||||
builder.startObject();
|
|
||||||
toXContentCommon(builder, params);
|
|
||||||
builder.field("started", started);
|
|
||||||
builder.endObject();
|
|
||||||
return builder;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean equals(Object obj) {
|
|
||||||
if (this == obj) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (obj == null || getClass() != obj.getClass()) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
Response response = (Response) obj;
|
|
||||||
return started == response.started;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int hashCode() {
|
|
||||||
return Objects.hash(started);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -13,7 +13,7 @@ import org.elasticsearch.xpack.core.transform.action.StartTransformAction.Reques
|
|||||||
public class StartTransformActionRequestTests extends AbstractWireSerializingTestCase<Request> {
|
public class StartTransformActionRequestTests extends AbstractWireSerializingTestCase<Request> {
|
||||||
@Override
|
@Override
|
||||||
protected Request createTestInstance() {
|
protected Request createTestInstance() {
|
||||||
return new Request(randomAlphaOfLengthBetween(1, 20), randomBoolean());
|
return new Request(randomAlphaOfLengthBetween(1, 20));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -1,23 +0,0 @@
|
|||||||
/*
|
|
||||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
|
||||||
* or more contributor license agreements. Licensed under the Elastic License;
|
|
||||||
* you may not use this file except in compliance with the Elastic License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.elasticsearch.xpack.core.transform.action;
|
|
||||||
|
|
||||||
import org.elasticsearch.common.io.stream.Writeable;
|
|
||||||
import org.elasticsearch.test.AbstractWireSerializingTestCase;
|
|
||||||
|
|
||||||
public class StartTransformTaskActionRequestTests extends
|
|
||||||
AbstractWireSerializingTestCase<StartTransformTaskAction.Request> {
|
|
||||||
@Override
|
|
||||||
protected StartTransformTaskAction.Request createTestInstance() {
|
|
||||||
return new StartTransformTaskAction.Request(randomAlphaOfLength(4), randomBoolean());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected Writeable.Reader<StartTransformTaskAction.Request> instanceReader() {
|
|
||||||
return StartTransformTaskAction.Request::new;
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,23 +0,0 @@
|
|||||||
/*
|
|
||||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
|
||||||
* or more contributor license agreements. Licensed under the Elastic License;
|
|
||||||
* you may not use this file except in compliance with the Elastic License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.elasticsearch.xpack.core.transform.action;
|
|
||||||
|
|
||||||
import org.elasticsearch.common.io.stream.Writeable;
|
|
||||||
import org.elasticsearch.test.AbstractWireSerializingTestCase;
|
|
||||||
|
|
||||||
public class StartTransformTaskActionResponseTests extends
|
|
||||||
AbstractWireSerializingTestCase<StartTransformTaskAction.Response> {
|
|
||||||
@Override
|
|
||||||
protected StartTransformTaskAction.Response createTestInstance() {
|
|
||||||
return new StartTransformTaskAction.Response(randomBoolean());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected Writeable.Reader<StartTransformTaskAction.Response> instanceReader() {
|
|
||||||
return StartTransformTaskAction.Response::new;
|
|
||||||
}
|
|
||||||
}
|
|
@ -62,7 +62,7 @@ teardown:
|
|||||||
- match: { acknowledged: true }
|
- match: { acknowledged: true }
|
||||||
|
|
||||||
- do:
|
- do:
|
||||||
catch: /Unable to start data frame transform \[airline-transform-start-stop\] as it is in state \[STARTED\]/
|
catch: /Cannot start transform \[airline-transform-start-stop\] as it is already started/
|
||||||
data_frame.start_data_frame_transform:
|
data_frame.start_data_frame_transform:
|
||||||
transform_id: "airline-transform-start-stop"
|
transform_id: "airline-transform-start-stop"
|
||||||
|
|
||||||
|
@ -224,14 +224,13 @@ public abstract class DataFrameRestTestCase extends ESRestTestCase {
|
|||||||
assertThat(createDataframeTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
|
assertThat(createDataframeTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void startDataframeTransform(String transformId, boolean force) throws IOException {
|
protected void startDataframeTransform(String transformId) throws IOException {
|
||||||
startDataframeTransform(transformId, force, null);
|
startDataframeTransform(transformId, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void startDataframeTransform(String transformId, boolean force, String authHeader, String... warnings) throws IOException {
|
protected void startDataframeTransform(String transformId, String authHeader, String... warnings) throws IOException {
|
||||||
// start the transform
|
// start the transform
|
||||||
final Request startTransformRequest = createRequestWithAuth("POST", DATAFRAME_ENDPOINT + transformId + "/_start", authHeader);
|
final Request startTransformRequest = createRequestWithAuth("POST", DATAFRAME_ENDPOINT + transformId + "/_start", authHeader);
|
||||||
startTransformRequest.addParameter(TransformField.FORCE.getPreferredName(), Boolean.toString(force));
|
|
||||||
if (warnings.length > 0) {
|
if (warnings.length > 0) {
|
||||||
startTransformRequest.setOptions(expectWarnings(warnings));
|
startTransformRequest.setOptions(expectWarnings(warnings));
|
||||||
}
|
}
|
||||||
@ -259,7 +258,7 @@ public abstract class DataFrameRestTestCase extends ESRestTestCase {
|
|||||||
protected void startAndWaitForTransform(String transformId, String dataFrameIndex,
|
protected void startAndWaitForTransform(String transformId, String dataFrameIndex,
|
||||||
String authHeader, String... warnings) throws Exception {
|
String authHeader, String... warnings) throws Exception {
|
||||||
// start the transform
|
// start the transform
|
||||||
startDataframeTransform(transformId, false, authHeader, warnings);
|
startDataframeTransform(transformId, authHeader, warnings);
|
||||||
assertTrue(indexExists(dataFrameIndex));
|
assertTrue(indexExists(dataFrameIndex));
|
||||||
// wait until the dataframe has been created and all data is available
|
// wait until the dataframe has been created and all data is available
|
||||||
waitForDataFrameCheckpoint(transformId);
|
waitForDataFrameCheckpoint(transformId);
|
||||||
@ -279,7 +278,7 @@ public abstract class DataFrameRestTestCase extends ESRestTestCase {
|
|||||||
String authHeader,
|
String authHeader,
|
||||||
long checkpoint) throws Exception {
|
long checkpoint) throws Exception {
|
||||||
// start the transform
|
// start the transform
|
||||||
startDataframeTransform(transformId, false, authHeader, new String[0]);
|
startDataframeTransform(transformId, authHeader, new String[0]);
|
||||||
assertTrue(indexExists(dataFrameIndex));
|
assertTrue(indexExists(dataFrameIndex));
|
||||||
// wait until the dataframe has been created and all data is available
|
// wait until the dataframe has been created and all data is available
|
||||||
waitForDataFrameCheckpoint(transformId, checkpoint);
|
waitForDataFrameCheckpoint(transformId, checkpoint);
|
||||||
|
@ -28,8 +28,6 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
|||||||
import static org.hamcrest.CoreMatchers.is;
|
import static org.hamcrest.CoreMatchers.is;
|
||||||
import static org.hamcrest.CoreMatchers.nullValue;
|
import static org.hamcrest.CoreMatchers.nullValue;
|
||||||
import static org.hamcrest.Matchers.equalTo;
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
|
||||||
import static org.hamcrest.Matchers.oneOf;
|
|
||||||
|
|
||||||
public class DataFrameTaskFailedStateIT extends DataFrameRestTestCase {
|
public class DataFrameTaskFailedStateIT extends DataFrameRestTestCase {
|
||||||
|
|
||||||
@ -65,7 +63,7 @@ public class DataFrameTaskFailedStateIT extends DataFrameRestTestCase {
|
|||||||
createDestinationIndexWithBadMapping(dataFrameIndex);
|
createDestinationIndexWithBadMapping(dataFrameIndex);
|
||||||
createContinuousPivotReviewsTransform(transformId, dataFrameIndex, null);
|
createContinuousPivotReviewsTransform(transformId, dataFrameIndex, null);
|
||||||
failureTransforms.add(transformId);
|
failureTransforms.add(transformId);
|
||||||
startDataframeTransform(transformId, false);
|
startDataframeTransform(transformId);
|
||||||
awaitState(transformId, TransformStats.State.FAILED);
|
awaitState(transformId, TransformStats.State.FAILED);
|
||||||
Map<?, ?> fullState = getDataFrameState(transformId);
|
Map<?, ?> fullState = getDataFrameState(transformId);
|
||||||
final String failureReason = "task encountered more than 0 failures; latest failure: " +
|
final String failureReason = "task encountered more than 0 failures; latest failure: " +
|
||||||
@ -89,14 +87,14 @@ public class DataFrameTaskFailedStateIT extends DataFrameRestTestCase {
|
|||||||
assertThat(XContentMapValues.extractValue("reason", fullState), is(nullValue()));
|
assertThat(XContentMapValues.extractValue("reason", fullState), is(nullValue()));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testForceStartFailedTransform() throws Exception {
|
public void testStartFailedTransform() throws Exception {
|
||||||
String transformId = "test-force-start-failed-transform";
|
String transformId = "test-force-start-failed-transform";
|
||||||
createReviewsIndex(REVIEWS_INDEX_NAME, 10);
|
createReviewsIndex(REVIEWS_INDEX_NAME, 10);
|
||||||
String dataFrameIndex = "failure_pivot_reviews";
|
String dataFrameIndex = "failure_pivot_reviews";
|
||||||
createDestinationIndexWithBadMapping(dataFrameIndex);
|
createDestinationIndexWithBadMapping(dataFrameIndex);
|
||||||
createContinuousPivotReviewsTransform(transformId, dataFrameIndex, null);
|
createContinuousPivotReviewsTransform(transformId, dataFrameIndex, null);
|
||||||
failureTransforms.add(transformId);
|
failureTransforms.add(transformId);
|
||||||
startDataframeTransform(transformId, false);
|
startDataframeTransform(transformId);
|
||||||
awaitState(transformId, TransformStats.State.FAILED);
|
awaitState(transformId, TransformStats.State.FAILED);
|
||||||
Map<?, ?> fullState = getDataFrameState(transformId);
|
Map<?, ?> fullState = getDataFrameState(transformId);
|
||||||
final String failureReason = "task encountered more than 0 failures; latest failure: " +
|
final String failureReason = "task encountered more than 0 failures; latest failure: " +
|
||||||
@ -106,26 +104,15 @@ public class DataFrameTaskFailedStateIT extends DataFrameRestTestCase {
|
|||||||
|
|
||||||
final String expectedFailure = "Unable to start data frame transform [test-force-start-failed-transform] " +
|
final String expectedFailure = "Unable to start data frame transform [test-force-start-failed-transform] " +
|
||||||
"as it is in a failed state with failure: [" + failureReason +
|
"as it is in a failed state with failure: [" + failureReason +
|
||||||
"]. Use force start to restart data frame transform once error is resolved.";
|
"]. Use force stop and then restart the data frame transform once error is resolved.";
|
||||||
// Verify that we cannot start the transform when the task is in a failed state
|
// Verify that we cannot start the transform when the task is in a failed state
|
||||||
assertBusy(() -> {
|
assertBusy(() -> {
|
||||||
ResponseException ex = expectThrows(ResponseException.class, () -> startDataframeTransform(transformId, false));
|
ResponseException ex = expectThrows(ResponseException.class, () -> startDataframeTransform(transformId));
|
||||||
assertThat(ex.getResponse().getStatusLine().getStatusCode(), equalTo(RestStatus.CONFLICT.getStatus()));
|
assertThat(ex.getResponse().getStatusLine().getStatusCode(), equalTo(RestStatus.CONFLICT.getStatus()));
|
||||||
assertThat(XContentMapValues.extractValue("error.reason", entityAsMap(ex.getResponse())),
|
assertThat(XContentMapValues.extractValue("error.reason", entityAsMap(ex.getResponse())),
|
||||||
equalTo(expectedFailure));
|
equalTo(expectedFailure));
|
||||||
}, 60, TimeUnit.SECONDS);
|
}, 60, TimeUnit.SECONDS);
|
||||||
|
|
||||||
// Correct the failure by deleting the destination index
|
|
||||||
deleteIndex(dataFrameIndex);
|
|
||||||
// Force start the data frame to indicate failure correction
|
|
||||||
startDataframeTransform(transformId, true);
|
|
||||||
|
|
||||||
// Verify that we have started and that our reason is cleared
|
|
||||||
fullState = getDataFrameState(transformId);
|
|
||||||
assertThat(XContentMapValues.extractValue("reason", fullState), is(nullValue()));
|
|
||||||
assertThat(XContentMapValues.extractValue("state", fullState), oneOf("started", "indexing"));
|
|
||||||
assertThat((Integer)XContentMapValues.extractValue("stats.index_failures", fullState), greaterThanOrEqualTo(1));
|
|
||||||
|
|
||||||
stopDataFrameTransform(transformId, true);
|
stopDataFrameTransform(transformId, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -50,7 +50,6 @@ import org.elasticsearch.xpack.core.transform.action.GetTransformsStatsAction;
|
|||||||
import org.elasticsearch.xpack.core.transform.action.PreviewTransformAction;
|
import org.elasticsearch.xpack.core.transform.action.PreviewTransformAction;
|
||||||
import org.elasticsearch.xpack.core.transform.action.PutTransformAction;
|
import org.elasticsearch.xpack.core.transform.action.PutTransformAction;
|
||||||
import org.elasticsearch.xpack.core.transform.action.StartTransformAction;
|
import org.elasticsearch.xpack.core.transform.action.StartTransformAction;
|
||||||
import org.elasticsearch.xpack.core.transform.action.StartTransformTaskAction;
|
|
||||||
import org.elasticsearch.xpack.core.transform.action.StopTransformAction;
|
import org.elasticsearch.xpack.core.transform.action.StopTransformAction;
|
||||||
import org.elasticsearch.xpack.core.transform.action.UpdateTransformAction;
|
import org.elasticsearch.xpack.core.transform.action.UpdateTransformAction;
|
||||||
import org.elasticsearch.xpack.transform.action.TransportDeleteDataFrameTransformAction;
|
import org.elasticsearch.xpack.transform.action.TransportDeleteDataFrameTransformAction;
|
||||||
@ -59,7 +58,6 @@ import org.elasticsearch.xpack.transform.action.TransportGetDataFrameTransformsS
|
|||||||
import org.elasticsearch.xpack.transform.action.TransportPreviewDataFrameTransformAction;
|
import org.elasticsearch.xpack.transform.action.TransportPreviewDataFrameTransformAction;
|
||||||
import org.elasticsearch.xpack.transform.action.TransportPutDataFrameTransformAction;
|
import org.elasticsearch.xpack.transform.action.TransportPutDataFrameTransformAction;
|
||||||
import org.elasticsearch.xpack.transform.action.TransportStartDataFrameTransformAction;
|
import org.elasticsearch.xpack.transform.action.TransportStartDataFrameTransformAction;
|
||||||
import org.elasticsearch.xpack.transform.action.TransportStartDataFrameTransformTaskAction;
|
|
||||||
import org.elasticsearch.xpack.transform.action.TransportStopDataFrameTransformAction;
|
import org.elasticsearch.xpack.transform.action.TransportStopDataFrameTransformAction;
|
||||||
import org.elasticsearch.xpack.transform.action.TransportUpdateDataFrameTransformAction;
|
import org.elasticsearch.xpack.transform.action.TransportUpdateDataFrameTransformAction;
|
||||||
import org.elasticsearch.xpack.transform.checkpoint.DataFrameTransformsCheckpointService;
|
import org.elasticsearch.xpack.transform.checkpoint.DataFrameTransformsCheckpointService;
|
||||||
@ -156,7 +154,6 @@ public class DataFrame extends Plugin implements ActionPlugin, PersistentTaskPlu
|
|||||||
return Arrays.asList(
|
return Arrays.asList(
|
||||||
new ActionHandler<>(PutTransformAction.INSTANCE, TransportPutDataFrameTransformAction.class),
|
new ActionHandler<>(PutTransformAction.INSTANCE, TransportPutDataFrameTransformAction.class),
|
||||||
new ActionHandler<>(StartTransformAction.INSTANCE, TransportStartDataFrameTransformAction.class),
|
new ActionHandler<>(StartTransformAction.INSTANCE, TransportStartDataFrameTransformAction.class),
|
||||||
new ActionHandler<>(StartTransformTaskAction.INSTANCE, TransportStartDataFrameTransformTaskAction.class),
|
|
||||||
new ActionHandler<>(StopTransformAction.INSTANCE, TransportStopDataFrameTransformAction.class),
|
new ActionHandler<>(StopTransformAction.INSTANCE, TransportStopDataFrameTransformAction.class),
|
||||||
new ActionHandler<>(DeleteTransformAction.INSTANCE, TransportDeleteDataFrameTransformAction.class),
|
new ActionHandler<>(DeleteTransformAction.INSTANCE, TransportDeleteDataFrameTransformAction.class),
|
||||||
new ActionHandler<>(GetTransformsAction.INSTANCE, TransportGetDataFrameTransformsAction.class),
|
new ActionHandler<>(GetTransformsAction.INSTANCE, TransportGetDataFrameTransformsAction.class),
|
||||||
|
@ -36,7 +36,6 @@ import org.elasticsearch.xpack.core.ClientHelper;
|
|||||||
import org.elasticsearch.xpack.core.XPackField;
|
import org.elasticsearch.xpack.core.XPackField;
|
||||||
import org.elasticsearch.xpack.core.transform.TransformMessages;
|
import org.elasticsearch.xpack.core.transform.TransformMessages;
|
||||||
import org.elasticsearch.xpack.core.transform.action.StartTransformAction;
|
import org.elasticsearch.xpack.core.transform.action.StartTransformAction;
|
||||||
import org.elasticsearch.xpack.core.transform.action.StartTransformTaskAction;
|
|
||||||
import org.elasticsearch.xpack.core.transform.transforms.Transform;
|
import org.elasticsearch.xpack.core.transform.transforms.Transform;
|
||||||
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
|
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
|
||||||
import org.elasticsearch.xpack.core.transform.transforms.TransformState;
|
import org.elasticsearch.xpack.core.transform.transforms.TransformState;
|
||||||
@ -55,6 +54,8 @@ import java.util.concurrent.atomic.AtomicReference;
|
|||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
import java.util.function.Predicate;
|
import java.util.function.Predicate;
|
||||||
|
|
||||||
|
import static org.elasticsearch.xpack.core.transform.TransformMessages.DATA_FRAME_CANNOT_START_FAILED_TRANSFORM;
|
||||||
|
|
||||||
public class TransportStartDataFrameTransformAction extends
|
public class TransportStartDataFrameTransformAction extends
|
||||||
TransportMasterNodeAction<StartTransformAction.Request, StartTransformAction.Response> {
|
TransportMasterNodeAction<StartTransformAction.Request, StartTransformAction.Response> {
|
||||||
|
|
||||||
@ -132,38 +133,20 @@ public class TransportStartDataFrameTransformAction extends
|
|||||||
newPersistentTaskActionListener);
|
newPersistentTaskActionListener);
|
||||||
} else {
|
} else {
|
||||||
TransformState transformState = (TransformState)existingTask.getState();
|
TransformState transformState = (TransformState)existingTask.getState();
|
||||||
if(transformState.getTaskState() == TransformTaskState.FAILED && request.isForce() == false) {
|
if(transformState.getTaskState() == TransformTaskState.FAILED) {
|
||||||
listener.onFailure(new ElasticsearchStatusException(
|
listener.onFailure(new ElasticsearchStatusException(
|
||||||
"Unable to start data frame transform [" + request.getId() +
|
TransformMessages.getMessage(DATA_FRAME_CANNOT_START_FAILED_TRANSFORM,
|
||||||
"] as it is in a failed state with failure: [" + transformState.getReason() +
|
request.getId(),
|
||||||
"]. Use force start to restart data frame transform once error is resolved.",
|
transformState.getReason()),
|
||||||
RestStatus.CONFLICT));
|
RestStatus.CONFLICT));
|
||||||
} else if (transformState.getTaskState() != TransformTaskState.STOPPED &&
|
|
||||||
transformState.getTaskState() != TransformTaskState.FAILED) {
|
|
||||||
listener.onFailure(new ElasticsearchStatusException(
|
|
||||||
"Unable to start data frame transform [" + request.getId() +
|
|
||||||
"] as it is in state [" + transformState.getTaskState() + "]", RestStatus.CONFLICT));
|
|
||||||
} else {
|
} else {
|
||||||
// If the task already exists but is not assigned to a node, something is weird
|
// If the task already exists that means that it is either running or failed
|
||||||
// return a failure that includes the current assignment explanation (if one exists)
|
// Since it is not failed, that means it is running, we return a conflict.
|
||||||
if (existingTask.isAssigned() == false) {
|
listener.onFailure(new ElasticsearchStatusException(
|
||||||
String assignmentExplanation = "unknown reason";
|
"Cannot start transform [{}] as it is already started.",
|
||||||
if (existingTask.getAssignment() != null) {
|
RestStatus.CONFLICT,
|
||||||
assignmentExplanation = existingTask.getAssignment().getExplanation();
|
request.getId()
|
||||||
}
|
));
|
||||||
listener.onFailure(new ElasticsearchStatusException("Unable to start data frame transform [" +
|
|
||||||
request.getId() + "] as it is not assigned to a node, explanation: " + assignmentExplanation,
|
|
||||||
RestStatus.CONFLICT));
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
// If the task already exists and is assigned to a node, simply attempt to set it to start
|
|
||||||
ClientHelper.executeAsyncWithOrigin(client,
|
|
||||||
ClientHelper.DATA_FRAME_ORIGIN,
|
|
||||||
StartTransformTaskAction.INSTANCE,
|
|
||||||
new StartTransformTaskAction.Request(request.getId(), request.isForce()),
|
|
||||||
ActionListener.wrap(
|
|
||||||
r -> listener.onResponse(new StartTransformAction.Response(true)),
|
|
||||||
listener::onFailure));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
@ -1,93 +0,0 @@
|
|||||||
/*
|
|
||||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
|
||||||
* or more contributor license agreements. Licensed under the Elastic License;
|
|
||||||
* you may not use this file except in compliance with the Elastic License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.elasticsearch.xpack.transform.action;
|
|
||||||
|
|
||||||
import org.elasticsearch.ResourceNotFoundException;
|
|
||||||
import org.elasticsearch.action.ActionListener;
|
|
||||||
import org.elasticsearch.action.FailedNodeException;
|
|
||||||
import org.elasticsearch.action.TaskOperationFailure;
|
|
||||||
import org.elasticsearch.action.support.ActionFilters;
|
|
||||||
import org.elasticsearch.action.support.tasks.TransportTasksAction;
|
|
||||||
import org.elasticsearch.cluster.service.ClusterService;
|
|
||||||
import org.elasticsearch.common.inject.Inject;
|
|
||||||
import org.elasticsearch.license.LicenseUtils;
|
|
||||||
import org.elasticsearch.license.XPackLicenseState;
|
|
||||||
import org.elasticsearch.tasks.Task;
|
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
|
||||||
import org.elasticsearch.transport.TransportService;
|
|
||||||
import org.elasticsearch.xpack.core.XPackField;
|
|
||||||
import org.elasticsearch.xpack.core.transform.action.StartTransformTaskAction;
|
|
||||||
import org.elasticsearch.xpack.transform.transforms.DataFrameTransformTask;
|
|
||||||
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Internal only transport class to change an allocated persistent task's state to started
|
|
||||||
*/
|
|
||||||
public class TransportStartDataFrameTransformTaskAction extends
|
|
||||||
TransportTasksAction<DataFrameTransformTask, StartTransformTaskAction.Request,
|
|
||||||
StartTransformTaskAction.Response, StartTransformTaskAction.Response> {
|
|
||||||
|
|
||||||
private final XPackLicenseState licenseState;
|
|
||||||
|
|
||||||
@Inject
|
|
||||||
public TransportStartDataFrameTransformTaskAction(TransportService transportService, ActionFilters actionFilters,
|
|
||||||
ClusterService clusterService, XPackLicenseState licenseState) {
|
|
||||||
super(StartTransformTaskAction.NAME, clusterService, transportService, actionFilters,
|
|
||||||
StartTransformTaskAction.Request::new, StartTransformTaskAction.Response::new,
|
|
||||||
StartTransformTaskAction.Response::new, ThreadPool.Names.SAME);
|
|
||||||
this.licenseState = licenseState;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void doExecute(Task task, StartTransformTaskAction.Request request,
|
|
||||||
ActionListener<StartTransformTaskAction.Response> listener) {
|
|
||||||
|
|
||||||
if (!licenseState.isDataFrameAllowed()) {
|
|
||||||
listener.onFailure(LicenseUtils.newComplianceException(XPackField.Transform));
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
super.doExecute(task, request, listener);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void taskOperation(StartTransformTaskAction.Request request, DataFrameTransformTask transformTask,
|
|
||||||
ActionListener<StartTransformTaskAction.Response> listener) {
|
|
||||||
if (transformTask.getTransformId().equals(request.getId())) {
|
|
||||||
transformTask.start(null, request.isForce(), listener);
|
|
||||||
} else {
|
|
||||||
listener.onFailure(new RuntimeException("ID of data frame transform task [" + transformTask.getTransformId()
|
|
||||||
+ "] does not match request's ID [" + request.getId() + "]"));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected StartTransformTaskAction.Response newResponse(StartTransformTaskAction.Request request,
|
|
||||||
List<StartTransformTaskAction.Response> tasks,
|
|
||||||
List<TaskOperationFailure> taskOperationFailures,
|
|
||||||
List<FailedNodeException> failedNodeExceptions) {
|
|
||||||
|
|
||||||
if (taskOperationFailures.isEmpty() == false) {
|
|
||||||
throw org.elasticsearch.ExceptionsHelper.convertToElastic(taskOperationFailures.get(0).getCause());
|
|
||||||
} else if (failedNodeExceptions.isEmpty() == false) {
|
|
||||||
throw org.elasticsearch.ExceptionsHelper.convertToElastic(failedNodeExceptions.get(0));
|
|
||||||
}
|
|
||||||
|
|
||||||
// Either the transform doesn't exist (the user didn't create it yet) or was deleted
|
|
||||||
// after the StartAPI executed.
|
|
||||||
// In either case, let the user know
|
|
||||||
if (tasks.size() == 0) {
|
|
||||||
throw new ResourceNotFoundException("Task for data frame transform [" + request.getId() + "] not found");
|
|
||||||
}
|
|
||||||
|
|
||||||
assert tasks.size() == 1;
|
|
||||||
|
|
||||||
boolean allStarted = tasks.stream().allMatch(StartTransformTaskAction.Response::isStarted);
|
|
||||||
return new StartTransformTaskAction.Response(allStarted);
|
|
||||||
}
|
|
||||||
}
|
|
@ -24,8 +24,7 @@ public class RestStartDataFrameTransformAction extends BaseRestHandler {
|
|||||||
@Override
|
@Override
|
||||||
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) {
|
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) {
|
||||||
String id = restRequest.param(TransformField.ID.getPreferredName());
|
String id = restRequest.param(TransformField.ID.getPreferredName());
|
||||||
boolean force = restRequest.paramAsBoolean(TransformField.FORCE.getPreferredName(), false);
|
StartTransformAction.Request request = new StartTransformAction.Request(id);
|
||||||
StartTransformAction.Request request = new StartTransformAction.Request(id, force);
|
|
||||||
request.timeout(restRequest.paramAsTime(TransformField.TIMEOUT.getPreferredName(), AcknowledgedRequest.DEFAULT_ACK_TIMEOUT));
|
request.timeout(restRequest.paramAsTime(TransformField.TIMEOUT.getPreferredName(), AcknowledgedRequest.DEFAULT_ACK_TIMEOUT));
|
||||||
return channel -> client.execute(StartTransformAction.INSTANCE, request,
|
return channel -> client.execute(StartTransformAction.INSTANCE, request,
|
||||||
new RestToXContentListener<>(channel));
|
new RestToXContentListener<>(channel));
|
||||||
|
@ -32,7 +32,7 @@ import org.elasticsearch.xpack.core.indexing.IndexerState;
|
|||||||
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
|
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
|
||||||
import org.elasticsearch.xpack.core.transform.TransformField;
|
import org.elasticsearch.xpack.core.transform.TransformField;
|
||||||
import org.elasticsearch.xpack.core.transform.TransformMessages;
|
import org.elasticsearch.xpack.core.transform.TransformMessages;
|
||||||
import org.elasticsearch.xpack.core.transform.action.StartTransformTaskAction;
|
import org.elasticsearch.xpack.core.transform.action.StartTransformAction;
|
||||||
import org.elasticsearch.xpack.core.transform.transforms.Transform;
|
import org.elasticsearch.xpack.core.transform.transforms.Transform;
|
||||||
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint;
|
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint;
|
||||||
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
|
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
|
||||||
@ -141,9 +141,13 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx
|
|||||||
|
|
||||||
final SetOnce<TransformState> stateHolder = new SetOnce<>();
|
final SetOnce<TransformState> stateHolder = new SetOnce<>();
|
||||||
|
|
||||||
ActionListener<StartTransformTaskAction.Response> startTaskListener = ActionListener.wrap(
|
ActionListener<StartTransformAction.Response> startTaskListener = ActionListener.wrap(
|
||||||
response -> logger.info("Successfully completed and scheduled task in node operation"),
|
response -> logger.info("[{}] successfully completed and scheduled task in node operation", transformId),
|
||||||
failure -> logger.error("Failed to start task ["+ transformId +"] in node operation", failure)
|
failure -> {
|
||||||
|
auditor.error(transformId, "Failed to start data frame transform. " +
|
||||||
|
"Please stop and attempt to start again. Failure: " + failure.getMessage());
|
||||||
|
logger.error("Failed to start task ["+ transformId +"] in node operation", failure);
|
||||||
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
// <7> load next checkpoint
|
// <7> load next checkpoint
|
||||||
@ -315,11 +319,10 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx
|
|||||||
private void startTask(DataFrameTransformTask buildTask,
|
private void startTask(DataFrameTransformTask buildTask,
|
||||||
ClientDataFrameIndexerBuilder indexerBuilder,
|
ClientDataFrameIndexerBuilder indexerBuilder,
|
||||||
Long previousCheckpoint,
|
Long previousCheckpoint,
|
||||||
ActionListener<StartTransformTaskAction.Response> listener) {
|
ActionListener<StartTransformAction.Response> listener) {
|
||||||
buildTask.initializeIndexer(indexerBuilder);
|
buildTask.initializeIndexer(indexerBuilder);
|
||||||
// DataFrameTransformTask#start will fail if the task state is FAILED
|
// DataFrameTransformTask#start will fail if the task state is FAILED
|
||||||
// Will continue to attempt to start the indexer, even if the state is STARTED
|
buildTask.setNumFailureRetries(numFailureRetries).start(previousCheckpoint, listener);
|
||||||
buildTask.setNumFailureRetries(numFailureRetries).start(previousCheckpoint, false, false, listener);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void setNumFailureRetries(int numFailureRetries) {
|
private void setNumFailureRetries(int numFailureRetries) {
|
||||||
|
@ -26,8 +26,7 @@ import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
|
|||||||
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine.Event;
|
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine.Event;
|
||||||
import org.elasticsearch.xpack.core.transform.TransformField;
|
import org.elasticsearch.xpack.core.transform.TransformField;
|
||||||
import org.elasticsearch.xpack.core.transform.TransformMessages;
|
import org.elasticsearch.xpack.core.transform.TransformMessages;
|
||||||
import org.elasticsearch.xpack.core.transform.action.StartTransformTaskAction;
|
import org.elasticsearch.xpack.core.transform.action.StartTransformAction;
|
||||||
import org.elasticsearch.xpack.core.transform.action.StartTransformTaskAction.Response;
|
|
||||||
import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerPosition;
|
import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerPosition;
|
||||||
import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats;
|
import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats;
|
||||||
import org.elasticsearch.xpack.core.transform.transforms.Transform;
|
import org.elasticsearch.xpack.core.transform.transforms.Transform;
|
||||||
@ -199,10 +198,17 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
|
|||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Here `failOnConflict` is usually true, except when the initial start is called when the task is assigned to the node
|
/**
|
||||||
synchronized void start(Long startingCheckpoint, boolean force, boolean failOnConflict, ActionListener<Response> listener) {
|
* Starts the transform and schedules it to be triggered in the future.
|
||||||
logger.debug("[{}] start called with force [{}] and state [{}].", getTransformId(), force, getState());
|
*
|
||||||
if (taskState.get() == TransformTaskState.FAILED && force == false) {
|
* NOTE: This should ONLY be called via {@link DataFrameTransformPersistentTasksExecutor}
|
||||||
|
*
|
||||||
|
* @param startingCheckpoint The starting checkpoint, could null. Null indicates that there is no starting checkpoint
|
||||||
|
* @param listener The listener to alert once started
|
||||||
|
*/
|
||||||
|
synchronized void start(Long startingCheckpoint, ActionListener<StartTransformAction.Response> listener) {
|
||||||
|
logger.debug("[{}] start called with state [{}].", getTransformId(), getState());
|
||||||
|
if (taskState.get() == TransformTaskState.FAILED) {
|
||||||
listener.onFailure(new ElasticsearchStatusException(
|
listener.onFailure(new ElasticsearchStatusException(
|
||||||
TransformMessages.getMessage(DATA_FRAME_CANNOT_START_FAILED_TRANSFORM,
|
TransformMessages.getMessage(DATA_FRAME_CANNOT_START_FAILED_TRANSFORM,
|
||||||
getTransformId(),
|
getTransformId(),
|
||||||
@ -223,15 +229,6 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
|
|||||||
msg));
|
msg));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
// If we are already in a `STARTED` state, we should not attempt to call `.start` on the indexer again.
|
|
||||||
if (taskState.get() == TransformTaskState.STARTED && failOnConflict) {
|
|
||||||
listener.onFailure(new ElasticsearchStatusException(
|
|
||||||
"Cannot start transform [{}] as it is already STARTED.",
|
|
||||||
RestStatus.CONFLICT,
|
|
||||||
getTransformId()
|
|
||||||
));
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
final IndexerState newState = getIndexer().start();
|
final IndexerState newState = getIndexer().start();
|
||||||
if (Arrays.stream(RUNNING_STATES).noneMatch(newState::equals)) {
|
if (Arrays.stream(RUNNING_STATES).noneMatch(newState::equals)) {
|
||||||
listener.onFailure(new ElasticsearchException("Cannot start task for data frame transform [{}], because state was [{}]",
|
listener.onFailure(new ElasticsearchException("Cannot start task for data frame transform [{}], because state was [{}]",
|
||||||
@ -265,7 +262,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
|
|||||||
// kick off the indexer
|
// kick off the indexer
|
||||||
triggered(new Event(schedulerJobName(), now, now));
|
triggered(new Event(schedulerJobName(), now, now));
|
||||||
registerWithSchedulerJob();
|
registerWithSchedulerJob();
|
||||||
listener.onResponse(new StartTransformTaskAction.Response(true));
|
listener.onResponse(new StartTransformAction.Response(true));
|
||||||
},
|
},
|
||||||
exc -> {
|
exc -> {
|
||||||
auditor.warning(transform.getId(),
|
auditor.warning(transform.getId(),
|
||||||
@ -277,16 +274,6 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
|
|||||||
}
|
}
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
/**
|
|
||||||
* Start the background indexer and set the task's state to started
|
|
||||||
* @param startingCheckpoint Set the current checkpoint to this value. If null the
|
|
||||||
* current checkpoint is not set
|
|
||||||
* @param force Whether to force start a failed task or not
|
|
||||||
* @param listener Started listener
|
|
||||||
*/
|
|
||||||
public synchronized void start(Long startingCheckpoint, boolean force, ActionListener<Response> listener) {
|
|
||||||
start(startingCheckpoint, force, true, listener);
|
|
||||||
}
|
|
||||||
|
|
||||||
public synchronized void stop(boolean force) {
|
public synchronized void stop(boolean force) {
|
||||||
logger.debug("[{}] stop called with force [{}] and state [{}]", getTransformId(), force, getState());
|
logger.debug("[{}] stop called with force [{}] and state [{}]", getTransformId(), force, getState());
|
||||||
|
Loading…
x
Reference in New Issue
Block a user