[ML][Transforms] add wait_for_checkpoint flag to stop (#47935) (#48591)

Adds `wait_for_checkpoint` for `_stop` API.
This commit is contained in:
Benjamin Trent 2019-10-28 13:02:57 -04:00 committed by GitHub
parent 97f48168d9
commit 6ea59dd428
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
33 changed files with 695 additions and 118 deletions

View File

@ -41,6 +41,7 @@ import static org.elasticsearch.client.RequestConverters.createEntity;
import static org.elasticsearch.client.transform.DeleteTransformRequest.FORCE;
import static org.elasticsearch.client.transform.GetTransformRequest.ALLOW_NO_MATCH;
import static org.elasticsearch.client.transform.PutTransformRequest.DEFER_VALIDATION;
import static org.elasticsearch.client.transform.StopTransformRequest.WAIT_FOR_CHECKPOINT;
final class TransformRequestConverters {
@ -135,6 +136,9 @@ final class TransformRequestConverters {
if (stopRequest.getAllowNoMatch() != null) {
request.addParameter(ALLOW_NO_MATCH, stopRequest.getAllowNoMatch().toString());
}
if (stopRequest.getWaitForCheckpoint() != null) {
request.addParameter(WAIT_FOR_CHECKPOINT, stopRequest.getWaitForCheckpoint().toString());
}
request.addParameters(params.asMap());
return request;
}

View File

@ -28,21 +28,23 @@ import java.util.Optional;
public class StopTransformRequest implements Validatable {
public static final String WAIT_FOR_CHECKPOINT = "wait_for_checkpoint";
private final String id;
private Boolean waitForCompletion;
private Boolean waitForCheckpoint;
private TimeValue timeout;
private Boolean allowNoMatch;
public StopTransformRequest(String id) {
this.id = id;
waitForCompletion = null;
timeout = null;
this(id, null, null, null);
}
public StopTransformRequest(String id, Boolean waitForCompletion, TimeValue timeout) {
public StopTransformRequest(String id, Boolean waitForCompletion, TimeValue timeout, Boolean waitForCheckpoint) {
this.id = id;
this.waitForCompletion = waitForCompletion;
this.timeout = timeout;
this.waitForCheckpoint = waitForCheckpoint;
}
public String getId() {
@ -73,6 +75,14 @@ public class StopTransformRequest implements Validatable {
this.allowNoMatch = allowNoMatch;
}
public Boolean getWaitForCheckpoint() {
return waitForCheckpoint;
}
public void setWaitForCheckpoint(Boolean waitForCheckpoint) {
this.waitForCheckpoint = waitForCheckpoint;
}
@Override
public Optional<ValidationException> validate() {
if (id == null) {
@ -86,7 +96,7 @@ public class StopTransformRequest implements Validatable {
@Override
public int hashCode() {
return Objects.hash(id, waitForCompletion, timeout, allowNoMatch);
return Objects.hash(id, waitForCompletion, timeout, allowNoMatch, waitForCheckpoint);
}
@Override
@ -102,6 +112,7 @@ public class StopTransformRequest implements Validatable {
return Objects.equals(this.id, other.id)
&& Objects.equals(this.waitForCompletion, other.waitForCompletion)
&& Objects.equals(this.timeout, other.timeout)
&& Objects.equals(this.waitForCheckpoint, other.waitForCheckpoint)
&& Objects.equals(this.allowNoMatch, other.allowNoMatch);
}

View File

@ -148,7 +148,7 @@ public class TransformIT extends ESRestHighLevelClientTestCase {
public void cleanUpTransforms() throws Exception {
for (String transformId : transformsToClean) {
highLevelClient().transform().stopTransform(
new StopTransformRequest(transformId, Boolean.TRUE, null), RequestOptions.DEFAULT);
new StopTransformRequest(transformId, Boolean.TRUE, null, false), RequestOptions.DEFAULT);
}
for (String transformId : transformsToClean) {
@ -311,7 +311,7 @@ public class TransformIT extends ESRestHighLevelClientTestCase {
assertThat(taskState, oneOf(TransformStats.State.STARTED, TransformStats.State.INDEXING,
TransformStats.State.STOPPING, TransformStats.State.STOPPED));
StopTransformRequest stopRequest = new StopTransformRequest(id, Boolean.TRUE, null);
StopTransformRequest stopRequest = new StopTransformRequest(id, Boolean.TRUE, null, false);
StopTransformResponse stopResponse =
execute(stopRequest, client::stopTransform, client::stopTransformAsync);
assertTrue(stopResponse.isAcknowledged());

View File

@ -148,7 +148,12 @@ public class TransformRequestConvertersTests extends ESTestCase {
if (randomBoolean()) {
timeValue = TimeValue.parseTimeValue(randomTimeValue(), "timeout");
}
StopTransformRequest stopRequest = new StopTransformRequest(id, waitForCompletion, timeValue);
Boolean waitForCheckpoint = null;
if (randomBoolean()) {
waitForCheckpoint = randomBoolean();
}
StopTransformRequest stopRequest = new StopTransformRequest(id, waitForCompletion, timeValue, waitForCheckpoint);
Request request = TransformRequestConverters.stopTransform(stopRequest);
assertEquals(HttpPost.METHOD_NAME, request.getMethod());
@ -167,6 +172,12 @@ public class TransformRequestConvertersTests extends ESTestCase {
} else {
assertFalse(request.getParameters().containsKey("timeout"));
}
if (waitForCheckpoint != null) {
assertTrue(request.getParameters().containsKey("wait_for_checkpoint"));
assertEquals(stopRequest.getWaitForCheckpoint(), Boolean.parseBoolean(request.getParameters().get("wait_for_checkpoint")));
} else {
assertFalse(request.getParameters().containsKey("wait_for_checkpoint"));
}
assertFalse(request.getParameters().containsKey(ALLOW_NO_MATCH));
stopRequest.setAllowNoMatch(randomBoolean());

View File

@ -81,7 +81,7 @@ public class TransformDocumentationIT extends ESRestHighLevelClientTestCase {
public void cleanUpTransforms() throws Exception {
for (String transformId : transformsToClean) {
highLevelClient().transform().stopTransform(
new StopTransformRequest(transformId, Boolean.TRUE, TimeValue.timeValueSeconds(20)), RequestOptions.DEFAULT);
new StopTransformRequest(transformId, true, TimeValue.timeValueSeconds(20), false), RequestOptions.DEFAULT);
}
for (String transformId : transformsToClean) {

View File

@ -90,6 +90,10 @@ are no matches or only partial matches.
state completely stops. If set to `false`, the API returns immediately and the
indexer will be stopped asynchronously in the background. Defaults to `false`.
`wait_for_checkpoint`::
(Optional, boolean) If set to `true`, the transform will not completely stop
until the current checkpoint is completed. If set to `false`, the transform
stops as soon as possible. Defaults to `false`.
[[stop-transform-response-codes]]
==== {api-response-codes-title}

View File

@ -22,6 +22,7 @@ public final class TransformField {
public static final ParseField GROUP_BY = new ParseField("group_by");
public static final ParseField TIMEOUT = new ParseField("timeout");
public static final ParseField WAIT_FOR_COMPLETION = new ParseField("wait_for_completion");
public static final ParseField WAIT_FOR_CHECKPOINT = new ParseField("wait_for_checkpoint");
public static final ParseField STATS_FIELD = new ParseField("stats");
public static final ParseField INDEX_DOC_TYPE = new ParseField("doc_type");
public static final ParseField SOURCE = new ParseField("source");

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.core.transform.action;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
@ -32,6 +33,8 @@ import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.action.ValidateActions.addValidationError;
public class StopTransformAction extends ActionType<StopTransformAction.Response> {
public static final StopTransformAction INSTANCE = new StopTransformAction();
@ -48,9 +51,15 @@ public class StopTransformAction extends ActionType<StopTransformAction.Response
private final boolean waitForCompletion;
private final boolean force;
private final boolean allowNoMatch;
private final boolean waitForCheckpoint;
private Set<String> expandedIds;
public Request(String id, boolean waitForCompletion, boolean force, @Nullable TimeValue timeout, boolean allowNoMatch) {
public Request(String id,
boolean waitForCompletion,
boolean force,
@Nullable TimeValue timeout,
boolean allowNoMatch,
boolean waitForCheckpoint) {
this.id = ExceptionsHelper.requireNonNull(id, TransformField.ID.getPreferredName());
this.waitForCompletion = waitForCompletion;
this.force = force;
@ -58,6 +67,7 @@ public class StopTransformAction extends ActionType<StopTransformAction.Response
// use the timeout value already present in BaseTasksRequest
this.setTimeout(timeout == null ? DEFAULT_TIMEOUT : timeout);
this.allowNoMatch = allowNoMatch;
this.waitForCheckpoint = waitForCheckpoint;
}
public Request(StreamInput in) throws IOException {
@ -73,6 +83,11 @@ public class StopTransformAction extends ActionType<StopTransformAction.Response
} else {
this.allowNoMatch = true;
}
if (in.getVersion().onOrAfter(Version.V_7_6_0)) {
this.waitForCheckpoint = in.readBoolean();
} else {
this.waitForCheckpoint = false;
}
}
public String getId() {
@ -99,6 +114,10 @@ public class StopTransformAction extends ActionType<StopTransformAction.Response
return allowNoMatch;
}
public boolean isWaitForCheckpoint() {
return waitForCheckpoint;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
@ -113,17 +132,27 @@ public class StopTransformAction extends ActionType<StopTransformAction.Response
if (out.getVersion().onOrAfter(Version.V_7_3_0)) {
out.writeBoolean(allowNoMatch);
}
if (out.getVersion().onOrAfter(Version.V_7_6_0)) {
out.writeBoolean(waitForCheckpoint);
}
}
@Override
public ActionRequestValidationException validate() {
if (force && waitForCheckpoint) {
return addValidationError(new ParameterizedMessage(
"cannot set both [{}] and [{}] to true",
TransformField.FORCE,
TransformField.WAIT_FOR_CHECKPOINT).getFormattedMessage(),
null);
}
return null;
}
@Override
public int hashCode() {
// the base class does not implement hashCode, therefore we need to hash timeout ourselves
return Objects.hash(id, waitForCompletion, force, expandedIds, this.getTimeout(), allowNoMatch);
return Objects.hash(id, waitForCompletion, force, expandedIds, this.getTimeout(), allowNoMatch, waitForCheckpoint);
}
@Override
@ -146,6 +175,7 @@ public class StopTransformAction extends ActionType<StopTransformAction.Response
Objects.equals(waitForCompletion, other.waitForCompletion) &&
Objects.equals(force, other.force) &&
Objects.equals(expandedIds, other.expandedIds) &&
Objects.equals(waitForCheckpoint, other.waitForCheckpoint) &&
allowNoMatch == other.allowNoMatch;
}

View File

@ -43,6 +43,8 @@ public class TransformState implements Task.Status, PersistentTaskState {
@Nullable
private NodeAttributes node;
private final boolean shouldStopAtNextCheckpoint;
public static final ParseField TASK_STATE = new ParseField("task_state");
public static final ParseField INDEXER_STATE = new ParseField("indexer_state");
@ -53,28 +55,38 @@ public class TransformState implements Task.Status, PersistentTaskState {
public static final ParseField REASON = new ParseField("reason");
public static final ParseField PROGRESS = new ParseField("progress");
public static final ParseField NODE = new ParseField("node");
public static final ParseField SHOULD_STOP_AT_NEXT_CHECKPOINT = new ParseField("should_stop_at_checkpoint");
@SuppressWarnings("unchecked")
public static final ConstructingObjectParser<TransformState, Void> PARSER = new ConstructingObjectParser<>(NAME,
true,
args -> {
TransformTaskState taskState = (TransformTaskState) args[0];
IndexerState indexerState = (IndexerState) args[1];
Map<String, Object> bwcCurrentPosition = (Map<String, Object>) args[2];
TransformIndexerPosition transformIndexerPosition = (TransformIndexerPosition) args[3];
true,
args -> {
TransformTaskState taskState = (TransformTaskState) args[0];
IndexerState indexerState = (IndexerState) args[1];
Map<String, Object> bwcCurrentPosition = (Map<String, Object>) args[2];
TransformIndexerPosition transformIndexerPosition = (TransformIndexerPosition) args[3];
// BWC handling, translate current_position to position iff position isn't set
if (bwcCurrentPosition != null && transformIndexerPosition == null) {
transformIndexerPosition = new TransformIndexerPosition(bwcCurrentPosition, null);
}
// BWC handling, translate current_position to position iff position isn't set
if (bwcCurrentPosition != null && transformIndexerPosition == null) {
transformIndexerPosition = new TransformIndexerPosition(bwcCurrentPosition, null);
}
long checkpoint = (long) args[4];
String reason = (String) args[5];
TransformProgress progress = (TransformProgress) args[6];
NodeAttributes node = (NodeAttributes) args[7];
long checkpoint = (long) args[4];
String reason = (String) args[5];
TransformProgress progress = (TransformProgress) args[6];
NodeAttributes node = (NodeAttributes) args[7];
boolean shouldStopAtNextCheckpoint = args[8] == null ? false : (boolean)args[8];
return new TransformState(taskState, indexerState, transformIndexerPosition, checkpoint, reason, progress, node);
});
return new TransformState(taskState,
indexerState,
transformIndexerPosition,
checkpoint,
reason,
progress,
node,
shouldStopAtNextCheckpoint);
});
static {
PARSER.declareField(constructorArg(), p -> TransformTaskState.fromString(p.text()), TASK_STATE, ValueType.STRING);
@ -85,15 +97,17 @@ public class TransformState implements Task.Status, PersistentTaskState {
PARSER.declareString(optionalConstructorArg(), REASON);
PARSER.declareField(optionalConstructorArg(), TransformProgress.PARSER::apply, PROGRESS, ValueType.OBJECT);
PARSER.declareField(optionalConstructorArg(), NodeAttributes.PARSER::apply, NODE, ValueType.OBJECT);
PARSER.declareBoolean(optionalConstructorArg(), SHOULD_STOP_AT_NEXT_CHECKPOINT);
}
public TransformState(TransformTaskState taskState,
IndexerState indexerState,
@Nullable TransformIndexerPosition position,
long checkpoint,
@Nullable String reason,
@Nullable TransformProgress progress,
@Nullable NodeAttributes node) {
IndexerState indexerState,
@Nullable TransformIndexerPosition position,
long checkpoint,
@Nullable String reason,
@Nullable TransformProgress progress,
@Nullable NodeAttributes node,
boolean shouldStopAtNextCheckpoint) {
this.taskState = taskState;
this.indexerState = indexerState;
this.position = position;
@ -101,14 +115,25 @@ public class TransformState implements Task.Status, PersistentTaskState {
this.reason = reason;
this.progress = progress;
this.node = node;
this.shouldStopAtNextCheckpoint = shouldStopAtNextCheckpoint;
}
public TransformState(TransformTaskState taskState,
IndexerState indexerState,
@Nullable TransformIndexerPosition position,
long checkpoint,
@Nullable String reason,
@Nullable TransformProgress progress) {
IndexerState indexerState,
@Nullable TransformIndexerPosition position,
long checkpoint,
@Nullable String reason,
@Nullable TransformProgress progress,
@Nullable NodeAttributes node) {
this(taskState, indexerState, position, checkpoint, reason, progress, node, false);
}
public TransformState(TransformTaskState taskState,
IndexerState indexerState,
@Nullable TransformIndexerPosition position,
long checkpoint,
@Nullable String reason,
@Nullable TransformProgress progress) {
this(taskState, indexerState, position, checkpoint, reason, progress, null);
}
@ -129,6 +154,11 @@ public class TransformState implements Task.Status, PersistentTaskState {
} else {
node = null;
}
if (in.getVersion().onOrAfter(Version.V_7_6_0)) {
shouldStopAtNextCheckpoint = in.readBoolean();
} else {
shouldStopAtNextCheckpoint = false;
}
}
public TransformTaskState getTaskState() {
@ -164,6 +194,10 @@ public class TransformState implements Task.Status, PersistentTaskState {
return this;
}
public boolean shouldStopAtNextCheckpoint() {
return shouldStopAtNextCheckpoint;
}
public static TransformState fromXContent(XContentParser parser) {
try {
return PARSER.parse(parser, null);
@ -190,6 +224,7 @@ public class TransformState implements Task.Status, PersistentTaskState {
if (node != null) {
builder.field(NODE.getPreferredName(), node);
}
builder.field(SHOULD_STOP_AT_NEXT_CHECKPOINT.getPreferredName(), shouldStopAtNextCheckpoint);
builder.endObject();
return builder;
}
@ -214,6 +249,9 @@ public class TransformState implements Task.Status, PersistentTaskState {
if (out.getVersion().onOrAfter(Version.V_7_3_0)) {
out.writeOptionalWriteable(node);
}
if (out.getVersion().onOrAfter(Version.V_7_6_0)) {
out.writeBoolean(shouldStopAtNextCheckpoint);
}
}
@Override
@ -234,12 +272,13 @@ public class TransformState implements Task.Status, PersistentTaskState {
this.checkpoint == that.checkpoint &&
Objects.equals(this.reason, that.reason) &&
Objects.equals(this.progress, that.progress) &&
Objects.equals(this.shouldStopAtNextCheckpoint, that.shouldStopAtNextCheckpoint) &&
Objects.equals(this.node, that.node);
}
@Override
public int hashCode() {
return Objects.hash(taskState, indexerState, position, checkpoint, reason, progress, node);
return Objects.hash(taskState, indexerState, position, checkpoint, reason, progress, node, shouldStopAtNextCheckpoint);
}
@Override

View File

@ -92,8 +92,8 @@ public class TransformStats implements Writeable, ToXContentObject {
public TransformStats(String id, State state, @Nullable String reason,
@Nullable NodeAttributes node, TransformIndexerStats stats,
TransformCheckpointingInfo checkpointingInfo) {
@Nullable NodeAttributes node, TransformIndexerStats stats,
TransformCheckpointingInfo checkpointingInfo) {
this.id = Objects.requireNonNull(id);
this.state = Objects.requireNonNull(state);
this.reason = reason;

View File

@ -24,7 +24,7 @@ public final class TransformInternalIndexConstants {
// internal index
// version is not a rollover pattern, however padded because sort is string based
public static final String INDEX_VERSION = "003";
public static final String INDEX_VERSION = "004";
public static final String INDEX_PATTERN = ".transform-internal-";
public static final String LATEST_INDEX_VERSIONED_NAME = INDEX_PATTERN + INDEX_VERSION;
public static final String LATEST_INDEX_NAME = LATEST_INDEX_VERSIONED_NAME;

View File

@ -23,7 +23,12 @@ public class StopTransformActionRequestTests extends AbstractWireSerializingTest
@Override
protected Request createTestInstance() {
TimeValue timeout = randomBoolean() ? TimeValue.timeValueMinutes(randomIntBetween(1, 10)) : null;
Request request = new Request(randomAlphaOfLengthBetween(1, 10), randomBoolean(), randomBoolean(), timeout, randomBoolean());
Request request = new Request(randomAlphaOfLengthBetween(1, 10),
randomBoolean(),
randomBoolean(),
timeout,
randomBoolean(),
randomBoolean());
if (randomBoolean()) {
request.setExpandedIds(new HashSet<>(Arrays.asList(generateRandomStringArray(5, 6, false))));
}
@ -40,9 +45,10 @@ public class StopTransformActionRequestTests extends AbstractWireSerializingTest
boolean waitForCompletion = randomBoolean();
boolean force = randomBoolean();
boolean allowNoMatch = randomBoolean();
boolean waitForCheckpoint = randomBoolean();
Request r1 = new Request(id, waitForCompletion, force, TimeValue.timeValueSeconds(10), allowNoMatch);
Request r2 = new Request(id, waitForCompletion, force, TimeValue.timeValueSeconds(20), allowNoMatch);
Request r1 = new Request(id, waitForCompletion, force, TimeValue.timeValueSeconds(10), allowNoMatch, waitForCheckpoint);
Request r2 = new Request(id, waitForCompletion, force, TimeValue.timeValueSeconds(20), allowNoMatch, waitForCheckpoint);
assertNotEquals(r1,r2);
assertNotEquals(r1.hashCode(),r2.hashCode());
@ -52,20 +58,20 @@ public class StopTransformActionRequestTests extends AbstractWireSerializingTest
String dataFrameId = "dataframe-id";
Task dataFrameTask = new Task(1L, "persistent", "action",
TransformField.PERSISTENT_TASK_DESCRIPTION_PREFIX + dataFrameId,
TaskId.EMPTY_TASK_ID, Collections.emptyMap());
TransformField.PERSISTENT_TASK_DESCRIPTION_PREFIX + dataFrameId,
TaskId.EMPTY_TASK_ID, Collections.emptyMap());
Request request = new Request("unrelated", false, false, null, false);
Request request = new Request("unrelated", false, false, null, false, false);
request.setExpandedIds(new HashSet<>(Arrays.asList("foo", "bar")));
assertFalse(request.match(dataFrameTask));
Request matchingRequest = new Request(dataFrameId, false, false, null, false);
Request matchingRequest = new Request(dataFrameId, false, false, null, false, false);
matchingRequest.setExpandedIds(Collections.singleton(dataFrameId));
assertTrue(matchingRequest.match(dataFrameTask));
Task notADataFrameTask = new Task(1L, "persistent", "action",
"some other task, say monitoring",
TaskId.EMPTY_TASK_ID, Collections.emptyMap());
"some other task, say monitoring",
TaskId.EMPTY_TASK_ID, Collections.emptyMap());
assertFalse(matchingRequest.match(notADataFrameTask));
}
}

View File

@ -6,6 +6,9 @@
package org.elasticsearch.xpack.core.transform.transforms;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractSerializingTestCase;
@ -26,7 +29,8 @@ public class TransformStateTests extends AbstractSerializingTestCase<TransformSt
randomLongBetween(0,10),
randomBoolean() ? null : randomAlphaOfLength(10),
randomBoolean() ? null : randomTransformProgress(),
randomBoolean() ? null : randomNodeAttributes());
randomBoolean() ? null : randomNodeAttributes(),
randomBoolean());
}
@Override
@ -53,4 +57,24 @@ public class TransformStateTests extends AbstractSerializingTestCase<TransformSt
protected Predicate<String> getRandomFieldsExcludeFilter() {
return field -> !field.isEmpty();
}
public void testBackwardsSerialization() throws IOException {
TransformState state = new TransformState(randomFrom(TransformTaskState.values()),
randomFrom(IndexerState.values()),
TransformIndexerPositionTests.randomTransformIndexerPosition(),
randomLongBetween(0,10),
randomBoolean() ? null : randomAlphaOfLength(10),
randomBoolean() ? null : randomTransformProgress(),
randomBoolean() ? null : randomNodeAttributes(),
false); // Will be false after BWC deserialization
try (BytesStreamOutput output = new BytesStreamOutput()) {
output.setVersion(Version.V_7_5_0);
state.writeTo(output);
try (StreamInput in = output.bytes().streamInput()) {
in.setVersion(Version.V_7_5_0);
TransformState streamedState = new TransformState(in);
assertEquals(state, streamedState);
}
}
}
}

View File

@ -20,21 +20,31 @@
}
]
},
"params":{
"wait_for_completion":{
"type":"boolean",
"required":false,
"description":"Whether to wait for the transform to fully stop before returning or not. Default to false"
"params": {
"force": {
"type": "boolean",
"required": false,
"description": "Whether to force stop a failed transform or not. Default to false"
},
"timeout":{
"type":"time",
"required":false,
"description":"Controls the time to wait until the transform has stopped. Default to 30 seconds"
"wait_for_completion": {
"type": "boolean",
"required": false,
"description": "Whether to wait for the transform to fully stop before returning or not. Default to false"
},
"allow_no_match":{
"type":"boolean",
"required":false,
"description":"Whether to ignore if a wildcard expression matches no transforms. (This includes `_all` string or when no transforms have been specified)"
"timeout": {
"type": "time",
"required": false,
"description": "Controls the time to wait until the transform has stopped. Default to 30 seconds"
},
"allow_no_match": {
"type": "boolean",
"required": false,
"description": "Whether to ignore if a wildcard expression matches no transforms. (This includes `_all` string or when no transforms have been specified)"
},
"wait_for_checkpoint": {
"type": "boolean",
"required": false,
"description": "Whether to wait for the transform to reach a checkpoint before stopping. Default to false"
}
}
}

View File

@ -51,6 +51,7 @@ setup:
teardown:
- do:
transform.stop_transform:
wait_for_checkpoint: false
transform_id: "airline-transform-start-stop"
timeout: "10m"
wait_for_completion: true
@ -59,6 +60,7 @@ teardown:
transform_id: "airline-transform-start-stop"
- do:
transform.stop_transform:
wait_for_checkpoint: false
transform_id: "airline-transform-start-stop-continuous"
timeout: "10m"
wait_for_completion: true
@ -131,6 +133,7 @@ teardown:
- do:
transform.stop_transform:
wait_for_checkpoint: false
transform_id: "airline-transform-start-stop"
wait_for_completion: true
- match: { acknowledged: true }
@ -168,6 +171,7 @@ teardown:
- do:
transform.stop_transform:
wait_for_checkpoint: false
transform_id: "airline-transform-start-stop-continuous"
wait_for_completion: true
- match: { acknowledged: true }
@ -193,6 +197,7 @@ teardown:
- do:
transform.stop_transform:
wait_for_checkpoint: false
transform_id: "airline-transform-start-stop-continuous"
wait_for_completion: true
- match: { acknowledged: true }
@ -201,25 +206,38 @@ teardown:
- do:
catch: missing
transform.stop_transform:
wait_for_checkpoint: false
transform_id: "missing-transform"
---
"Test stop missing transform by expression":
- do:
transform.stop_transform:
wait_for_checkpoint: false
allow_no_match: true
transform_id: "missing-transform*"
- do:
catch: missing
transform.stop_transform:
wait_for_checkpoint: false
allow_no_match: false
transform_id: "missing-transform*"
---
"Test stop transform with force and wait_for_checkpoint true ":
- do:
catch: /cannot set both \[force\] and \[wait_for_checkpoint\] to true/
transform.stop_transform:
wait_for_checkpoint: true
force: true
transform_id: "airline-transform-start-stop-continuous"
---
"Test stop already stopped transform":
- do:
transform.stop_transform:
wait_for_checkpoint: false
transform_id: "airline-transform-start-stop"
- match: { acknowledged: true }
@ -269,6 +287,7 @@ teardown:
- do:
transform.stop_transform:
wait_for_checkpoint: false
transform_id: "airline-transform-start-stop-continuous"
wait_for_completion: true
- match: { acknowledged: true }
@ -283,6 +302,7 @@ teardown:
- do:
transform.stop_transform:
wait_for_checkpoint: false
transform_id: "airline-transform-start-later"
wait_for_completion: true
- match: { acknowledged: true }
@ -317,6 +337,7 @@ teardown:
- do:
transform.stop_transform:
wait_for_checkpoint: false
transform_id: "_all"
wait_for_completion: true
- match: { acknowledged: true }

View File

@ -33,6 +33,7 @@ setup:
teardown:
- do:
transform.stop_transform:
wait_for_checkpoint: false
transform_id: "airline-transform-stats"
wait_for_completion: true
@ -252,6 +253,7 @@ teardown:
- do:
transform.stop_transform:
wait_for_checkpoint: false
transform_id: "airline-transform-stats-continuous"
wait_for_completion: true

View File

@ -235,6 +235,49 @@ public class TransformIT extends TransformIntegTestCase {
deleteTransform(config.getId());
}
public void testStopWaitForCheckpoint() throws Exception {
String indexName = "wait-for-checkpoint-reviews";
String transformId = "data-frame-transform-wait-for-checkpoint";
createReviewsIndex(indexName, 1000);
Map<String, SingleGroupSource> groups = new HashMap<>();
groups.put("by-day", createDateHistogramGroupSourceWithCalendarInterval("timestamp", DateHistogramInterval.DAY, null));
groups.put("by-user", TermsGroupSource.builder().setField("user_id").build());
groups.put("by-business", TermsGroupSource.builder().setField("business_id").build());
AggregatorFactories.Builder aggs = AggregatorFactories.builder()
.addAggregator(AggregationBuilders.avg("review_score").field("stars"))
.addAggregator(AggregationBuilders.max("timestamp").field("timestamp"));
TransformConfig config = createTransformConfigBuilder(transformId,
groups,
aggs,
"reviews-by-user-business-day",
QueryBuilders.matchAllQuery(),
indexName)
.setSyncConfig(new TimeSyncConfig("timestamp", TimeValue.timeValueSeconds(1)))
.build();
assertTrue(putTransform(config, RequestOptions.DEFAULT).isAcknowledged());
assertTrue(startTransform(config.getId(), RequestOptions.DEFAULT).isAcknowledged());
// waitForCheckpoint: true should make the transform continue until we hit the first checkpoint, then it will stop
stopTransform(transformId, false, null, true);
// Wait until the first checkpoint
waitUntilCheckpoint(config.getId(), 1L);
// Even though we are continuous, we should be stopped now as we needed to stop at the first checkpoint
assertBusy(() -> {
TransformStats stateAndStats = getTransformStats(config.getId()).getTransformsStats().get(0);
assertThat(stateAndStats.getState(), equalTo(TransformStats.State.STOPPED));
assertThat(stateAndStats.getIndexerStats().getNumDocuments(), equalTo(1000L));
});
stopTransform(config.getId());
deleteTransform(config.getId());
}
private void indexMoreDocs(long timestamp, long userId, String index) throws Exception {
BulkRequest bulk = new BulkRequest(index);
for (int i = 0; i < 25; i++) {

View File

@ -86,8 +86,16 @@ abstract class TransformIntegTestCase extends ESRestTestCase {
}
protected StopTransformResponse stopTransform(String id) throws IOException {
return stopTransform(id, true, null, false);
}
protected StopTransformResponse stopTransform(String id,
boolean waitForCompletion,
TimeValue timeout,
boolean waitForCheckpoint) throws IOException {
RestHighLevelClient restClient = new TestRestHighLevelClient();
return restClient.transform().stopTransform(new StopTransformRequest(id, true, null), RequestOptions.DEFAULT);
return restClient.transform()
.stopTransform(new StopTransformRequest(id, waitForCompletion, timeout, waitForCheckpoint), RequestOptions.DEFAULT);
}
protected StartTransformResponse startTransform(String id, RequestOptions options) throws IOException {
@ -298,7 +306,7 @@ abstract class TransformIntegTestCase extends ESRestTestCase {
.append("\"}");
bulk.add(new IndexRequest().source(sourceBuilder.toString(), XContentType.JSON));
if (i % 50 == 0) {
if (i % 100 == 0) {
BulkResponse response = restClient.bulk(bulk, RequestOptions.DEFAULT);
assertThat(response.buildFailureMessage(), response.hasFailures(), is(false));
bulk = new BulkRequest(indexName);

View File

@ -7,6 +7,7 @@
package org.elasticsearch.xpack.transform.integration;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.junit.Before;
@ -863,6 +864,61 @@ public class TransformPivotRestIT extends TransformRestTestCase {
assertEquals(101, ((List<?>)XContentMapValues.extractValue("transforms.stats.pages_processed", stats)).get(0));
}
public void testContinuousStopWaitForCheckpoint() throws Exception {
Request updateLoggingLevels = new Request("PUT", "/_cluster/settings");
updateLoggingLevels.setJsonEntity(
"{\"transient\": {" +
"\"logger.org.elasticsearch.xpack.core.indexing.AsyncTwoPhaseIndexer\": \"trace\"," +
"\"logger.org.elasticsearch.xpack.transform\": \"trace\"}}");
client().performRequest(updateLoggingLevels);
String indexName = "continuous_reviews_wait_for_checkpoint";
createReviewsIndex(indexName);
String transformId = "simple_continuous_pivot_wait_for_checkpoint";
String dataFrameIndex = "pivot_reviews_continuous_wait_for_checkpoint";
setupDataAccessRole(DATA_ACCESS_ROLE, indexName, dataFrameIndex);
final Request createDataframeTransformRequest = createRequestWithAuth("PUT", getTransformEndpoint() + transformId,
BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS);
String config = "{"
+ " \"source\": {\"index\":\"" + indexName + "\"},"
+ " \"dest\": {\"index\":\"" + dataFrameIndex + "\"},"
+ " \"frequency\": \"1s\","
+ " \"sync\": {\"time\": {\"field\": \"timestamp\", \"delay\": \"1s\"}},"
+ " \"pivot\": {"
+ " \"group_by\": {"
+ " \"reviewer\": {"
+ " \"terms\": {"
+ " \"field\": \"user_id\""
+ " } } },"
+ " \"aggregations\": {"
+ " \"avg_rating\": {"
+ " \"avg\": {"
+ " \"field\": \"stars\""
+ " } } } }"
+ "}";
createDataframeTransformRequest.setJsonEntity(config);
Map<String, Object> createDataframeTransformResponse = entityAsMap(client().performRequest(createDataframeTransformRequest));
assertThat(createDataframeTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
startAndWaitForContinuousTransform(transformId, dataFrameIndex, null);
assertTrue(indexExists(dataFrameIndex));
assertBusy(() -> {
try {
stopTransform(transformId,false, true);
} catch (ResponseException e) {
// We get a conflict sometimes depending on WHEN we try to write the state, should eventually pass though
assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(200));
}
});
// get and check some users
assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_0", 3.776978417);
assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_5", 3.72);
assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_11", 3.846153846);
assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_20", 3.769230769);
assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_26", 3.918918918);
deleteIndex(indexName);
}
private void assertOnePivotValue(String query, double expected) throws IOException {
Map<String, Object> searchResult = getAsMap(query);

View File

@ -260,10 +260,14 @@ public abstract class TransformRestTestCase extends ESRestTestCase {
}
protected void stopTransform(String transformId, boolean force) throws Exception {
// start the transform
stopTransform(transformId, force, false);
}
protected void stopTransform(String transformId, boolean force, boolean waitForCheckpoint) throws Exception {
final Request stopTransformRequest = createRequestWithAuth("POST", getTransformEndpoint() + transformId + "/_stop", null);
stopTransformRequest.addParameter(TransformField.FORCE.getPreferredName(), Boolean.toString(force));
stopTransformRequest.addParameter(TransformField.WAIT_FOR_COMPLETION.getPreferredName(), Boolean.toString(true));
stopTransformRequest.addParameter(TransformField.WAIT_FOR_CHECKPOINT.getPreferredName(), Boolean.toString(waitForCheckpoint));
Map<String, Object> stopTransformResponse = entityAsMap(client().performRequest(stopTransformRequest));
assertThat(stopTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
}

View File

@ -91,7 +91,7 @@ public class TransportDeleteTransformAction extends TransportMasterNodeAction<Re
executeAsyncWithOrigin(client,
TRANSFORM_ORIGIN,
StopTransformAction.INSTANCE,
new StopTransformAction.Request(request.getId(), true, true, null, true),
new StopTransformAction.Request(request.getId(), true, true, null, true, false),
ActionListener.wrap(
r -> stopTransformActionListener.onResponse(null),
stopTransformActionListener::onFailure));

View File

@ -17,6 +17,7 @@ import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.tasks.TransportTasksAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
@ -93,25 +94,14 @@ public class TransportGetTransformStatsAction extends
ClusterState state = clusterService.state();
String nodeId = state.nodes().getLocalNode().getId();
if (task.isCancelled() == false) {
TransformState transformState = task.getState();
task.getCheckpointingInfo(transformCheckpointService, ActionListener.wrap(
checkpointingInfo -> listener.onResponse(new Response(
Collections.singletonList(new TransformStats(task.getTransformId(),
TransformStats.State.fromComponents(transformState.getTaskState(), transformState.getIndexerState()),
transformState.getReason(),
null,
task.getStats(),
checkpointingInfo)),
Collections.singletonList(deriveStats(task, checkpointingInfo)),
1L)),
e -> {
logger.warn("Failed to retrieve checkpointing info for transform [" + task.getTransformId() + "]", e);
listener.onResponse(new Response(
Collections.singletonList(new TransformStats(task.getTransformId(),
TransformStats.State.fromComponents(transformState.getTaskState(), transformState.getIndexerState()),
transformState.getReason(),
null,
task.getStats(),
TransformCheckpointingInfo.EMPTY)),
Collections.singletonList(deriveStats(task, null)),
1L,
Collections.emptyList(),
Collections.singletonList(new FailedNodeException(nodeId, "Failed to retrieve checkpointing info", e))));
@ -170,6 +160,26 @@ public class TransportGetTransformStatsAction extends
}
}
static TransformStats deriveStats(TransformTask task, @Nullable TransformCheckpointingInfo checkpointingInfo) {
TransformState transformState = task.getState();
TransformStats.State derivedState = TransformStats.State.fromComponents(transformState.getTaskState(),
transformState.getIndexerState());
String reason = transformState.getReason();
if (transformState.shouldStopAtNextCheckpoint() &&
derivedState.equals(TransformStats.State.STOPPED) == false &&
derivedState.equals(TransformStats.State.FAILED) == false) {
derivedState = TransformStats.State.STOPPING;
reason = reason.isEmpty() ? "transform is set to stop at the next checkpoint" : reason;
}
return new TransformStats(
task.getTransformId(),
derivedState,
reason,
null,
task.getStats(),
checkpointingInfo == null ? TransformCheckpointingInfo.EMPTY : checkpointingInfo);
}
private void collectStatsForTransformsWithoutTasks(Request request,
Response response,
ActionListener<Response> listener) {

View File

@ -153,13 +153,24 @@ public class TransportStopTransformAction extends TransportTasksAction<Transform
}
if (ids.contains(transformTask.getTransformId())) {
try {
transformTask.stop(request.isForce());
} catch (ElasticsearchException ex) {
listener.onFailure(ex);
return;
}
listener.onResponse(new Response(Boolean.TRUE));
transformTask.setShouldStopAtCheckpoint(request.isWaitForCheckpoint(), ActionListener.wrap(
r -> {
try {
transformTask.stop(request.isForce(), request.isWaitForCheckpoint());
listener.onResponse(new Response(true));
} catch (ElasticsearchException ex) {
listener.onFailure(ex);
}
},
e -> listener.onFailure(
new ElasticsearchStatusException(
"Failed to update transform task [{}] state value should_stop_at_checkpoint from [{}] to [{}]",
RestStatus.CONFLICT,
transformTask.getTransformId(),
transformTask.getState().shouldStopAtNextCheckpoint(),
request.isWaitForCheckpoint()))
)
);
} else {
listener.onFailure(new RuntimeException("ID of transform task [" + transformTask.getTransformId()
+ "] does not match request's ID [" + request.getId() + "]"));

View File

@ -55,6 +55,8 @@ public final class TransformInternalIndex {
* progress::docs_processed, progress::docs_indexed,
* stats::exponential_avg_checkpoint_duration_ms, stats::exponential_avg_documents_indexed,
* stats::exponential_avg_documents_processed
*
* version 4 (7.6): state::should_stop_at_checkpoint
*/
// constants for mappings
@ -72,6 +74,7 @@ public final class TransformInternalIndex {
public static final String DOUBLE = "double";
public static final String LONG = "long";
public static final String KEYWORD = "keyword";
public static final String BOOLEAN = "boolean";
public static IndexTemplateMetaData getIndexTemplateMetaData() throws IOException {
IndexTemplateMetaData transformTemplate = IndexTemplateMetaData.builder(TransformInternalIndexConstants.LATEST_INDEX_VERSIONED_NAME)
@ -179,6 +182,9 @@ public final class TransformInternalIndex {
.startObject(TransformState.INDEXER_STATE.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.startObject(TransformState.SHOULD_STOP_AT_NEXT_CHECKPOINT.getPreferredName())
.field(TYPE, BOOLEAN)
.endObject()
.startObject(TransformState.CURRENT_POSITION.getPreferredName())
.field(ENABLED, false)
.endObject()

View File

@ -28,13 +28,15 @@ public class RestStopTransformAction extends BaseRestHandler {
boolean waitForCompletion = restRequest.paramAsBoolean(TransformField.WAIT_FOR_COMPLETION.getPreferredName(), false);
boolean force = restRequest.paramAsBoolean(TransformField.FORCE.getPreferredName(), false);
boolean allowNoMatch = restRequest.paramAsBoolean(TransformField.ALLOW_NO_MATCH.getPreferredName(), false);
boolean waitForCheckpoint = restRequest.paramAsBoolean(TransformField.WAIT_FOR_CHECKPOINT.getPreferredName(), false);
StopTransformAction.Request request = new StopTransformAction.Request(id,
waitForCompletion,
force,
timeout,
allowNoMatch);
allowNoMatch,
waitForCheckpoint);
return channel -> client.execute(StopTransformAction.INSTANCE, request,
new RestToXContentListener<>(channel));

View File

@ -36,13 +36,15 @@ public class RestStopTransformActionDeprecated extends BaseRestHandler {
boolean waitForCompletion = restRequest.paramAsBoolean(TransformField.WAIT_FOR_COMPLETION.getPreferredName(), false);
boolean force = restRequest.paramAsBoolean(TransformField.FORCE.getPreferredName(), false);
boolean allowNoMatch = restRequest.paramAsBoolean(TransformField.ALLOW_NO_MATCH.getPreferredName(), false);
boolean waitForCheckpoint = restRequest.paramAsBoolean(TransformField.WAIT_FOR_CHECKPOINT.getPreferredName(), false);
StopTransformAction.Request request = new StopTransformAction.Request(id,
waitForCompletion,
force,
timeout,
allowNoMatch);
allowNoMatch,
waitForCheckpoint);
return channel -> client.execute(StopTransformActionDeprecated.INSTANCE, request,
new RestToXContentListener<>(channel));

View File

@ -64,6 +64,7 @@ class ClientTransformIndexer extends TransformIndexer {
// Keeps track of the last exception that was written to our audit, keeps us from spamming the audit index
private volatile String lastAuditedExceptionMessage = null;
private final AtomicBoolean oldStatsCleanedUp = new AtomicBoolean(false);
private volatile boolean shouldStopAtCheckpoint = false;
private volatile Instant changesLastDetectedAt;
ClientTransformIndexer(TransformConfigManager transformsConfigManager,
@ -78,7 +79,8 @@ class ClientTransformIndexer extends TransformIndexer {
TransformProgress transformProgress,
TransformCheckpoint lastCheckpoint,
TransformCheckpoint nextCheckpoint,
TransformTask parentTask) {
TransformTask parentTask,
boolean shouldStopAtCheckpoint) {
super(ExceptionsHelper.requireNonNull(parentTask, "parentTask")
.getThreadPool()
.executor(ThreadPool.Names.GENERIC),
@ -97,6 +99,50 @@ class ClientTransformIndexer extends TransformIndexer {
this.client = ExceptionsHelper.requireNonNull(client, "client");
this.transformTask = parentTask;
this.failureCount = new AtomicInteger(0);
this.shouldStopAtCheckpoint = shouldStopAtCheckpoint;
}
boolean shouldStopAtCheckpoint() {
return shouldStopAtCheckpoint;
}
void setShouldStopAtCheckpoint(boolean shouldStopAtCheckpoint) {
this.shouldStopAtCheckpoint = shouldStopAtCheckpoint;
}
void persistShouldStopAtCheckpoint(boolean shouldStopAtCheckpoint, ActionListener<Void> shouldStopAtCheckpointListener) {
if (this.shouldStopAtCheckpoint == shouldStopAtCheckpoint ||
getState() == IndexerState.STOPPED ||
getState() == IndexerState.STOPPING) {
shouldStopAtCheckpointListener.onResponse(null);
return;
}
TransformState state = new TransformState(
transformTask.getTaskState(),
getState(),
getPosition(),
transformTask.getCheckpoint(),
transformTask.getStateReason(),
getProgress(),
null, //Node attributes
shouldStopAtCheckpoint);
doSaveState(state,
ActionListener.wrap(
r -> {
// We only want to update this internal value if it is persisted as such
this.shouldStopAtCheckpoint = shouldStopAtCheckpoint;
logger.debug("[{}] successfully persisted should_stop_at_checkpoint update [{}]",
getJobId(),
shouldStopAtCheckpoint);
shouldStopAtCheckpointListener.onResponse(null);
},
statsExc -> {
logger.warn("[{}] failed to persist should_stop_at_checkpoint update [{}]",
getJobId(),
shouldStopAtCheckpoint);
shouldStopAtCheckpointListener.onFailure(statsExc);
}
));
}
@Override
@ -297,6 +343,21 @@ class ClientTransformIndexer extends TransformIndexer {
return;
}
boolean shouldStopAtCheckpoint = shouldStopAtCheckpoint();
// If we should stop at the next checkpoint, are STARTED, and with `initialRun()` we are in one of two states
// 1. We have just called `onFinish` completing our request, but `shouldStopAtCheckpoint` was set to `true` before our check
// there and now
// 2. We are on the very first run of a NEW checkpoint and got here either through a failure, or the very first save state call.
//
// In either case, we should stop so that we guarantee a consistent state and that there are no partially completed checkpoints
if (shouldStopAtCheckpoint && initialRun() && indexerState.equals(IndexerState.STARTED)) {
indexerState = IndexerState.STOPPED;
auditor.info(transformConfig.getId(), "Transform is no longer in the middle of a checkpoint, initiating stop.");
logger.info("[{}] transform is no longer in the middle of a checkpoint, initiating stop.",
transformConfig.getId());
}
// This means that the indexer was triggered to discover changes, found none, and exited early.
// If the state is `STOPPED` this means that TransformTask#stop was called while we were checking for changes.
// Allow the stop call path to continue
@ -321,6 +382,12 @@ class ClientTransformIndexer extends TransformIndexer {
// OR we called `doSaveState` manually as the indexer was not actively running.
// Since we save the state to an index, we should make sure that our task state is in parity with the indexer state
if (indexerState.equals(IndexerState.STOPPED)) {
// If we are going to stop after the state is saved, we should NOT persist `shouldStopAtCheckpoint: true` as this may
// cause problems if the task starts up again.
// Additionally, we don't have to worry about inconsistency with the ClusterState (if it is persisted there) as the
// when we stop, we mark the task as complete and that state goes away.
shouldStopAtCheckpoint = false;
// We don't want adjust the stored taskState because as soon as it is `STOPPED` a user could call
// .start again.
taskState = TransformTaskState.STOPPED;
@ -332,9 +399,19 @@ class ClientTransformIndexer extends TransformIndexer {
position,
transformTask.getCheckpoint(),
transformTask.getStateReason(),
getProgress());
getProgress(),
null,
shouldStopAtCheckpoint);
logger.debug("[{}] updating persistent state of transform to [{}].", transformConfig.getId(), state.toString());
doSaveState(state, ActionListener.wrap(
r -> next.run(),
e -> next.run()
));
}
private void doSaveState(TransformState state, ActionListener<Void> listener) {
// This could be `null` but the putOrUpdateTransformStoredDoc handles that case just fine
SeqNoPrimaryTermAndIndex seqNoPrimaryTermAndIndex = transformTask.getSeqNoPrimaryTermAndIndex();
@ -356,7 +433,7 @@ class ClientTransformIndexer extends TransformIndexer {
transformsConfigManager.deleteOldTransformStoredDocuments(getJobId(), ActionListener.wrap(
nil -> {
logger.trace("[{}] deleted old transform stats and state document", getJobId());
next.run();
listener.onResponse(null);
},
e -> {
String msg = LoggerMessageFormat.format("[{}] failed deleting old transform configurations.",
@ -364,11 +441,11 @@ class ClientTransformIndexer extends TransformIndexer {
logger.warn(msg, e);
// If we have failed, we should attempt the clean up again later
oldStatsCleanedUp.set(false);
next.run();
listener.onResponse(null);
}
));
} else {
next.run();
listener.onResponse(null);
}
},
statsExc -> {
@ -381,7 +458,7 @@ class ClientTransformIndexer extends TransformIndexer {
if (state.getTaskState().equals(TransformTaskState.STOPPED)) {
transformTask.shutdown();
}
next.run();
listener.onFailure(statsExc);
}
));
}
@ -404,6 +481,9 @@ class ClientTransformIndexer extends TransformIndexer {
// This indicates an early exit since no changes were found.
// So, don't treat this like a checkpoint being completed, as no work was done.
if (hasSourceChanged == false) {
if (shouldStopAtCheckpoint) {
stop();
}
listener.onResponse(null);
return;
}
@ -447,6 +527,9 @@ class ClientTransformIndexer extends TransformIndexer {
logger.debug(
"[{}] finished indexing for transform checkpoint [{}].", getJobId(), checkpoint);
auditBulkFailures = true;
if (shouldStopAtCheckpoint) {
stop();
}
listener.onResponse(null);
} catch (Exception e) {
listener.onFailure(e);

View File

@ -34,6 +34,7 @@ class ClientTransformIndexerBuilder {
private TransformProgress progress;
private TransformCheckpoint lastCheckpoint;
private TransformCheckpoint nextCheckpoint;
private boolean shouldStopAtCheckpoint;
ClientTransformIndexerBuilder() {
this.initialStats = new TransformIndexerStats();
@ -54,7 +55,13 @@ class ClientTransformIndexerBuilder {
this.progress,
this.lastCheckpoint,
this.nextCheckpoint,
parentTask);
parentTask,
this.shouldStopAtCheckpoint);
}
ClientTransformIndexerBuilder setShouldStopAtCheckpoint(boolean shouldStopAtCheckpoint) {
this.shouldStopAtCheckpoint = shouldStopAtCheckpoint;
return this;
}
ClientTransformIndexerBuilder setClient(Client client) {
@ -120,4 +127,4 @@ class ClientTransformIndexerBuilder {
this.nextCheckpoint = nextCheckpoint;
return this;
}
}
}

View File

@ -217,16 +217,18 @@ public class TransformPersistentTasksExecutor extends PersistentTasksExecutor<Tr
// Since we have not set the value for this yet, it SHOULD be null
buildTask.updateSeqNoPrimaryTermAndIndex(null, seqNoPrimaryTermAndIndex);
logger.trace("[{}] initializing state and stats: [{}]", transformId, stateAndStats.toString());
TransformState transformState = stateAndStats.getTransformState();
indexerBuilder.setInitialStats(stateAndStats.getTransformStats())
.setInitialPosition(stateAndStats.getTransformState().getPosition())
.setProgress(stateAndStats.getTransformState().getProgress())
.setIndexerState(currentIndexerState(stateAndStats.getTransformState()));
.setIndexerState(currentIndexerState(transformState))
.setShouldStopAtCheckpoint(transformState.shouldStopAtNextCheckpoint());
logger.debug("[{}] Loading existing state: [{}], position [{}]",
transformId,
stateAndStats.getTransformState(),
stateAndStats.getTransformState().getPosition());
stateHolder.set(stateAndStats.getTransformState());
stateHolder.set(transformState);
final long lastCheckpoint = stateHolder.get().getCheckpoint();
if (lastCheckpoint == 0) {

View File

@ -141,7 +141,9 @@ public class TransformTask extends AllocatedPersistentTask implements SchedulerE
initialPosition,
currentCheckpoint.get(),
stateReason.get(),
null);
null,
null,
false);
} else {
return new TransformState(
taskState.get(),
@ -149,7 +151,9 @@ public class TransformTask extends AllocatedPersistentTask implements SchedulerE
indexer.get().getPosition(),
currentCheckpoint.get(),
stateReason.get(),
getIndexer().getProgress());
getIndexer().getProgress(),
null,
getIndexer().shouldStopAtCheckpoint());
}
}
@ -247,7 +251,9 @@ public class TransformTask extends AllocatedPersistentTask implements SchedulerE
getIndexer().getPosition(),
currentCheckpoint.get(),
null,
getIndexer().getProgress());
getIndexer().getProgress(),
null,
getIndexer().shouldStopAtCheckpoint());
logger.info("[{}] updating state for transform to [{}].", transform.getId(), state.toString());
// Even though the indexer information is persisted to an index, we still need TransformTaskState in the clusterstate
@ -275,8 +281,34 @@ public class TransformTask extends AllocatedPersistentTask implements SchedulerE
));
}
public synchronized void stop(boolean force) {
logger.debug("[{}] stop called with force [{}] and state [{}]", getTransformId(), force, getState());
/**
* This sets the flag for the task to stop at the next checkpoint.
*
* If first persists the flag to cluster state, and then mutates the local variable.
*
* It only persists to cluster state if the value is different than what is currently held in memory.
* @param shouldStopAtCheckpoint whether or not we should stop at the next checkpoint or not
* @param shouldStopAtCheckpointListener the listener to return to when we have persisted the updated value to the state index.
*/
public synchronized void setShouldStopAtCheckpoint(boolean shouldStopAtCheckpoint,
ActionListener<Void> shouldStopAtCheckpointListener) {
logger.debug("[{}] attempted to set task to stop at checkpoint [{}] with state [{}]",
getTransformId(),
shouldStopAtCheckpoint,
getState());
if (taskState.get() != TransformTaskState.STARTED || getIndexer() == null) {
shouldStopAtCheckpointListener.onResponse(null);
return;
}
getIndexer().persistShouldStopAtCheckpoint(shouldStopAtCheckpoint, shouldStopAtCheckpointListener);
}
public synchronized void stop(boolean force, boolean shouldStopAtCheckpoint) {
logger.debug("[{}] stop called with force [{}], shouldStopAtCheckpoint [{}], state [{}]",
getTransformId(),
force,
shouldStopAtCheckpoint,
getState());
if (getIndexer() == null) {
// If there is no indexer the task has not been triggered
// but it still needs to be stopped and removed
@ -296,16 +328,23 @@ public class TransformTask extends AllocatedPersistentTask implements SchedulerE
RestStatus.CONFLICT);
}
IndexerState state = getIndexer().stop();
stateReason.set(null);
// No reason to keep it in the potentially failed state.
// Since we have called `stop` against the indexer, we have no more fear of triggering again.
// But, since `doSaveState` is asynchronous, it is best to set the state as STARTED so that another `start` call cannot be
// executed while we are wrapping up.
taskState.compareAndSet(TransformTaskState.FAILED, TransformTaskState.STARTED);
if (state == IndexerState.STOPPED) {
getIndexer().onStop();
getIndexer().doSaveState(state, getIndexer().getPosition(), () -> {});
boolean wasFailed = taskState.compareAndSet(TransformTaskState.FAILED, TransformTaskState.STARTED);
// shouldStopAtCheckpoint only comes into play when onFinish is called (or doSaveState right after).
// if it is false, stop immediately
if (shouldStopAtCheckpoint == false ||
// If state was in a failed state, we should stop immediately as we will never reach the next checkpoint
wasFailed ||
// If the indexerState is STARTED and it is on an initialRun, that means that the indexer has previously finished a checkpoint,
// or has yet to even start one.
// Either way, this means that we won't get to have onFinish called down stream (or at least won't for some time).
(getIndexer().getState() == IndexerState.STARTED && getIndexer().initialRun())) {
IndexerState state = getIndexer().stop();
if (state == IndexerState.STOPPED) {
getIndexer().onStop();
getIndexer().doSaveState(state, getIndexer().getPosition(), () -> {});
}
}
}
@ -400,6 +439,12 @@ public class TransformTask extends AllocatedPersistentTask implements SchedulerE
// We should not keep retrying. Either the task will be stopped, or started
// If it is started again, it is registered again.
deregisterSchedulerJob();
// The idea of stopping at the next checkpoint is no longer valid. Since a failed task could potentially START again,
// we should set this flag to false.
if (getIndexer() != null) {
getIndexer().setShouldStopAtCheckpoint(false);
}
// The end user should see that the task is in a failed state, and attempt to stop it again but with force=true
taskState.set(TransformTaskState.FAILED);
stateReason.set(reason);
TransformState newState = getState();

View File

@ -0,0 +1,134 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.transform.action;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.indexing.IndexerState;
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointStats;
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo;
import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats;
import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStatsTests;
import org.elasticsearch.xpack.core.transform.transforms.TransformState;
import org.elasticsearch.xpack.core.transform.transforms.TransformStats;
import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState;
import org.elasticsearch.xpack.transform.transforms.TransformTask;
import java.time.Instant;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TransportGetTransformStatsActionTests extends ESTestCase {
private TransformTask task = mock(TransformTask.class);
public void testDeriveStatsStopped() {
String transformId = "transform-with-stats";
String reason = "";
TransformIndexerStats stats = TransformIndexerStatsTests.randomStats();
TransformState stoppedState =
new TransformState(TransformTaskState.STOPPED, IndexerState.STOPPED, null, 0, reason, null, null, true);
withIdStateAndStats(transformId, stoppedState, stats);
TransformCheckpointingInfo info = new TransformCheckpointingInfo(
new TransformCheckpointStats(1, null, null, 1, 1),
new TransformCheckpointStats(2, null, null, 2, 5),
2,
Instant.now());
assertThat(TransportGetTransformStatsAction.deriveStats(task, null),
equalTo(new TransformStats(transformId, TransformStats.State.STOPPED, "", null, stats, TransformCheckpointingInfo.EMPTY)));
assertThat(TransportGetTransformStatsAction.deriveStats(task, info),
equalTo(new TransformStats(transformId, TransformStats.State.STOPPED, "", null, stats, info)));
reason = "foo";
stoppedState = new TransformState(TransformTaskState.STOPPED, IndexerState.STOPPED, null, 0, reason, null, null, true);
withIdStateAndStats(transformId, stoppedState, stats);
assertThat(TransportGetTransformStatsAction.deriveStats(task, null),
equalTo(new TransformStats(transformId, TransformStats.State.STOPPED, reason, null, stats, TransformCheckpointingInfo.EMPTY)));
assertThat(TransportGetTransformStatsAction.deriveStats(task, info),
equalTo(new TransformStats(transformId, TransformStats.State.STOPPED, reason, null, stats, info)));
}
public void testDeriveStatsFailed() {
String transformId = "transform-with-stats";
String reason = "";
TransformIndexerStats stats = TransformIndexerStatsTests.randomStats();
TransformState failedState =
new TransformState(TransformTaskState.FAILED, IndexerState.STOPPED, null, 0, reason, null, null, true);
withIdStateAndStats(transformId, failedState, stats);
TransformCheckpointingInfo info = new TransformCheckpointingInfo(
new TransformCheckpointStats(1, null, null, 1, 1),
new TransformCheckpointStats(2, null, null, 2, 5),
2,
Instant.now());
assertThat(TransportGetTransformStatsAction.deriveStats(task, null),
equalTo(new TransformStats(transformId, TransformStats.State.FAILED, "", null, stats, TransformCheckpointingInfo.EMPTY)));
assertThat(TransportGetTransformStatsAction.deriveStats(task, info),
equalTo(new TransformStats(transformId, TransformStats.State.FAILED, "", null, stats, info)));
reason = "the task is failed";
failedState = new TransformState(TransformTaskState.FAILED, IndexerState.STOPPED, null, 0, reason, null, null, true);
withIdStateAndStats(transformId, failedState, stats);
assertThat(TransportGetTransformStatsAction.deriveStats(task, null),
equalTo(new TransformStats(transformId, TransformStats.State.FAILED, reason, null, stats, TransformCheckpointingInfo.EMPTY)));
assertThat(TransportGetTransformStatsAction.deriveStats(task, info),
equalTo(new TransformStats(transformId, TransformStats.State.FAILED, reason, null, stats, info)));
}
public void testDeriveStats() {
String transformId = "transform-with-stats";
String reason = "";
TransformIndexerStats stats = TransformIndexerStatsTests.randomStats();
TransformState runningState =
new TransformState(TransformTaskState.STARTED, IndexerState.INDEXING, null, 0, reason, null, null, true);
withIdStateAndStats(transformId, runningState, stats);
TransformCheckpointingInfo info = new TransformCheckpointingInfo(
new TransformCheckpointStats(1, null, null, 1, 1),
new TransformCheckpointStats(2, null, null, 2, 5),
2,
Instant.now());
assertThat(TransportGetTransformStatsAction.deriveStats(task, null),
equalTo(new TransformStats(transformId, TransformStats.State.STOPPING,
"transform is set to stop at the next checkpoint", null, stats, TransformCheckpointingInfo.EMPTY)));
assertThat(TransportGetTransformStatsAction.deriveStats(task, info),
equalTo(new TransformStats(transformId, TransformStats.State.STOPPING,
"transform is set to stop at the next checkpoint", null, stats, info)));
reason = "foo";
runningState = new TransformState(TransformTaskState.STARTED, IndexerState.INDEXING, null, 0, reason, null, null, true);
withIdStateAndStats(transformId, runningState, stats);
assertThat(TransportGetTransformStatsAction.deriveStats(task, null),
equalTo(new TransformStats(transformId, TransformStats.State.STOPPING, reason, null, stats, TransformCheckpointingInfo.EMPTY)));
assertThat(TransportGetTransformStatsAction.deriveStats(task, info),
equalTo(new TransformStats(transformId, TransformStats.State.STOPPING, reason, null, stats, info)));
// Stop at next checkpoint is false.
runningState = new TransformState(TransformTaskState.STARTED, IndexerState.INDEXING, null, 0, reason, null, null, false);
withIdStateAndStats(transformId, runningState, stats);
assertThat(TransportGetTransformStatsAction.deriveStats(task, null),
equalTo(new TransformStats(transformId, TransformStats.State.INDEXING, reason, null, stats, TransformCheckpointingInfo.EMPTY)));
assertThat(TransportGetTransformStatsAction.deriveStats(task, info),
equalTo(new TransformStats(transformId, TransformStats.State.INDEXING, reason, null, stats, info)));
}
private void withIdStateAndStats(String transformId, TransformState state, TransformIndexerStats stats) {
when(task.getTransformId()).thenReturn(transformId);
when(task.getState()).thenReturn(state);
when(task.getStats()).thenReturn(stats);
}
}

View File

@ -67,7 +67,8 @@ public class ClientTransformIndexerTests extends ESTestCase {
2L,
Collections.emptyMap(),
Instant.now().toEpochMilli()),
parentTask);
parentTask,
false);
List<Boolean> shouldAudit = IntStream.range(0, 100_000).boxed().map(indexer::shouldAuditOnFinish).collect(Collectors.toList());

View File

@ -276,6 +276,6 @@ setup:
- do:
indices.get_mapping:
index: .transform-internal-003
- match: { \.transform-internal-003.mappings.dynamic: "false" }
- match: { \.transform-internal-003.mappings.properties.id.type: "keyword" }
index: .transform-internal-004
- match: { \.transform-internal-004.mappings.dynamic: "false" }
- match: { \.transform-internal-004.mappings.properties.id.type: "keyword" }