mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-22 12:56:53 +00:00
[ML-DataFrame] rewrite start and stop to answer with acknowledged (#42589)
rewrite start and stop to answer with acknowledged fixes #42450
This commit is contained in:
parent
53f5d313cd
commit
345ff21ae5
@ -30,21 +30,19 @@ import java.util.List;
|
||||
|
||||
public class StartDataFrameTransformResponse extends AcknowledgedTasksResponse {
|
||||
|
||||
private static final String STARTED = "started";
|
||||
private static final String ACKNOWLEDGED = "acknowledged";
|
||||
|
||||
private static final ConstructingObjectParser<StartDataFrameTransformResponse, Void> PARSER =
|
||||
AcknowledgedTasksResponse.generateParser("start_data_frame_transform_response", StartDataFrameTransformResponse::new, STARTED);
|
||||
AcknowledgedTasksResponse.generateParser("start_data_frame_transform_response", StartDataFrameTransformResponse::new,
|
||||
ACKNOWLEDGED);
|
||||
|
||||
public static StartDataFrameTransformResponse fromXContent(final XContentParser parser) throws IOException {
|
||||
return PARSER.parse(parser, null);
|
||||
}
|
||||
|
||||
public StartDataFrameTransformResponse(boolean started, @Nullable List<TaskOperationFailure> taskFailures,
|
||||
public StartDataFrameTransformResponse(boolean acknowledged, @Nullable List<TaskOperationFailure> taskFailures,
|
||||
@Nullable List<? extends ElasticsearchException> nodeFailures) {
|
||||
super(started, taskFailures, nodeFailures);
|
||||
super(acknowledged, taskFailures, nodeFailures);
|
||||
}
|
||||
|
||||
public boolean isStarted() {
|
||||
return isAcknowledged();
|
||||
}
|
||||
}
|
||||
|
@ -30,21 +30,18 @@ import java.util.List;
|
||||
|
||||
public class StopDataFrameTransformResponse extends AcknowledgedTasksResponse {
|
||||
|
||||
private static final String STOPPED = "stopped";
|
||||
private static final String ACKNOWLEDGED = "acknowledged";
|
||||
|
||||
private static final ConstructingObjectParser<StopDataFrameTransformResponse, Void> PARSER =
|
||||
AcknowledgedTasksResponse.generateParser("stop_data_frame_transform_response", StopDataFrameTransformResponse::new, STOPPED);
|
||||
private static final ConstructingObjectParser<StopDataFrameTransformResponse, Void> PARSER = AcknowledgedTasksResponse
|
||||
.generateParser("stop_data_frame_transform_response", StopDataFrameTransformResponse::new, ACKNOWLEDGED);
|
||||
|
||||
public static StopDataFrameTransformResponse fromXContent(final XContentParser parser) throws IOException {
|
||||
return PARSER.parse(parser, null);
|
||||
}
|
||||
|
||||
public StopDataFrameTransformResponse(boolean stopped, @Nullable List<TaskOperationFailure> taskFailures,
|
||||
public StopDataFrameTransformResponse(boolean acknowledged, @Nullable List<TaskOperationFailure> taskFailures,
|
||||
@Nullable List<? extends ElasticsearchException> nodeFailures) {
|
||||
super(stopped, taskFailures, nodeFailures);
|
||||
super(acknowledged, taskFailures, nodeFailures);
|
||||
}
|
||||
|
||||
public boolean isStopped() {
|
||||
return isAcknowledged();
|
||||
}
|
||||
}
|
||||
|
@ -258,7 +258,7 @@ public class DataFrameTransformIT extends ESRestHighLevelClientTestCase {
|
||||
StartDataFrameTransformRequest startRequest = new StartDataFrameTransformRequest(id);
|
||||
StartDataFrameTransformResponse startResponse =
|
||||
execute(startRequest, client::startDataFrameTransform, client::startDataFrameTransformAsync);
|
||||
assertTrue(startResponse.isStarted());
|
||||
assertTrue(startResponse.isAcknowledged());
|
||||
assertThat(startResponse.getNodeFailures(), empty());
|
||||
assertThat(startResponse.getTaskFailures(), empty());
|
||||
|
||||
@ -271,7 +271,7 @@ public class DataFrameTransformIT extends ESRestHighLevelClientTestCase {
|
||||
StopDataFrameTransformRequest stopRequest = new StopDataFrameTransformRequest(id, Boolean.TRUE, null);
|
||||
StopDataFrameTransformResponse stopResponse =
|
||||
execute(stopRequest, client::stopDataFrameTransform, client::stopDataFrameTransformAsync);
|
||||
assertTrue(stopResponse.isStopped());
|
||||
assertTrue(stopResponse.isAcknowledged());
|
||||
assertThat(stopResponse.getNodeFailures(), empty());
|
||||
assertThat(stopResponse.getTaskFailures(), empty());
|
||||
}
|
||||
@ -358,7 +358,7 @@ public class DataFrameTransformIT extends ESRestHighLevelClientTestCase {
|
||||
StartDataFrameTransformResponse startTransformResponse = execute(new StartDataFrameTransformRequest(id),
|
||||
client::startDataFrameTransform,
|
||||
client::startDataFrameTransformAsync);
|
||||
assertThat(startTransformResponse.isStarted(), is(true));
|
||||
assertThat(startTransformResponse.isAcknowledged(), is(true));
|
||||
assertBusy(() -> {
|
||||
GetDataFrameTransformStatsResponse response = execute(new GetDataFrameTransformStatsRequest(id),
|
||||
client::getDataFrameTransformStats, client::getDataFrameTransformStatsAsync);
|
||||
|
@ -244,7 +244,7 @@ public class DataFrameTransformDocumentationIT extends ESRestHighLevelClientTest
|
||||
request, RequestOptions.DEFAULT);
|
||||
// end::start-data-frame-transform-execute
|
||||
|
||||
assertTrue(response.isStarted());
|
||||
assertTrue(response.isAcknowledged());
|
||||
}
|
||||
{
|
||||
// tag::stop-data-frame-transform-request
|
||||
@ -263,7 +263,7 @@ public class DataFrameTransformDocumentationIT extends ESRestHighLevelClientTest
|
||||
request, RequestOptions.DEFAULT);
|
||||
// end::stop-data-frame-transform-execute
|
||||
|
||||
assertTrue(response.isStopped());
|
||||
assertTrue(response.isAcknowledged());
|
||||
}
|
||||
{
|
||||
// tag::start-data-frame-transform-execute-listener
|
||||
|
@ -41,7 +41,7 @@ When the {dataframe-transform} starts, you receive the following results:
|
||||
[source,js]
|
||||
----
|
||||
{
|
||||
"started" : true
|
||||
"acknowledged" : true
|
||||
}
|
||||
----
|
||||
// TESTRESPONSE
|
||||
|
@ -61,7 +61,7 @@ When the {dataframe-transform} stops, you receive the following results:
|
||||
[source,js]
|
||||
----
|
||||
{
|
||||
"stopped" : true
|
||||
"acknowledged" : true
|
||||
}
|
||||
----
|
||||
// TESTRESPONSE
|
||||
|
@ -96,33 +96,33 @@ public class StartDataFrameTransformAction extends Action<StartDataFrameTransfor
|
||||
}
|
||||
|
||||
public static class Response extends BaseTasksResponse implements ToXContentObject {
|
||||
private final boolean started;
|
||||
private final boolean acknowledged;
|
||||
|
||||
public Response(StreamInput in) throws IOException {
|
||||
super(in);
|
||||
started = in.readBoolean();
|
||||
acknowledged = in.readBoolean();
|
||||
}
|
||||
|
||||
public Response(boolean started) {
|
||||
public Response(boolean acknowledged) {
|
||||
super(Collections.emptyList(), Collections.emptyList());
|
||||
this.started = started;
|
||||
this.acknowledged = acknowledged;
|
||||
}
|
||||
|
||||
public boolean isStarted() {
|
||||
return started;
|
||||
public boolean isAcknowledged() {
|
||||
return acknowledged;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeBoolean(started);
|
||||
out.writeBoolean(acknowledged);
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
toXContentCommon(builder, params);
|
||||
builder.field("started", started);
|
||||
builder.field("acknowledged", acknowledged);
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
@ -137,12 +137,12 @@ public class StartDataFrameTransformAction extends Action<StartDataFrameTransfor
|
||||
return false;
|
||||
}
|
||||
Response response = (Response) obj;
|
||||
return started == response.started;
|
||||
return acknowledged == response.acknowledged;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(started);
|
||||
return Objects.hash(acknowledged);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -158,40 +158,40 @@ public class StopDataFrameTransformAction extends Action<StopDataFrameTransformA
|
||||
|
||||
public static class Response extends BaseTasksResponse implements Writeable, ToXContentObject {
|
||||
|
||||
private final boolean stopped;
|
||||
private final boolean acknowledged;
|
||||
|
||||
public Response(StreamInput in) throws IOException {
|
||||
super(in);
|
||||
stopped = in.readBoolean();
|
||||
acknowledged = in.readBoolean();
|
||||
}
|
||||
|
||||
public Response(boolean stopped) {
|
||||
public Response(boolean acknowledged) {
|
||||
super(Collections.emptyList(), Collections.emptyList());
|
||||
this.stopped = stopped;
|
||||
this.acknowledged = acknowledged;
|
||||
}
|
||||
|
||||
public Response(List<TaskOperationFailure> taskFailures,
|
||||
List<? extends ElasticsearchException> nodeFailures,
|
||||
boolean stopped) {
|
||||
boolean acknowledged) {
|
||||
super(taskFailures, nodeFailures);
|
||||
this.stopped = stopped;
|
||||
this.acknowledged = acknowledged;
|
||||
}
|
||||
|
||||
public boolean isStopped() {
|
||||
return stopped;
|
||||
public boolean isAcknowledged() {
|
||||
return acknowledged;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeBoolean(stopped);
|
||||
out.writeBoolean(acknowledged);
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
toXContentCommon(builder, params);
|
||||
builder.field("stopped", stopped);
|
||||
builder.field("acknowledged", acknowledged);
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
@ -203,12 +203,12 @@ public class StopDataFrameTransformAction extends Action<StopDataFrameTransformA
|
||||
if (o == null || getClass() != o.getClass())
|
||||
return false;
|
||||
Response response = (Response) o;
|
||||
return stopped == response.stopped;
|
||||
return acknowledged == response.acknowledged;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(stopped);
|
||||
return Objects.hash(acknowledged);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -50,7 +50,7 @@ public class DataFrameTransformIT extends DataFrameIntegTestCase {
|
||||
REVIEWS_INDEX_NAME);
|
||||
|
||||
assertTrue(putDataFrameTransform(config, RequestOptions.DEFAULT).isAcknowledged());
|
||||
assertTrue(startDataFrameTransform(config.getId(), RequestOptions.DEFAULT).isStarted());
|
||||
assertTrue(startDataFrameTransform(config.getId(), RequestOptions.DEFAULT).isAcknowledged());
|
||||
|
||||
waitUntilCheckpoint(config.getId(), 1L);
|
||||
|
||||
|
@ -191,7 +191,7 @@ public abstract class DataFrameRestTestCase extends ESRestTestCase {
|
||||
startTransformRequest.setOptions(expectWarnings(warnings));
|
||||
}
|
||||
Map<String, Object> startTransformResponse = entityAsMap(client().performRequest(startTransformRequest));
|
||||
assertThat(startTransformResponse.get("started"), equalTo(Boolean.TRUE));
|
||||
assertThat(startTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
|
||||
}
|
||||
|
||||
protected void stopDataFrameTransform(String transformId, boolean force) throws Exception {
|
||||
@ -200,7 +200,7 @@ public abstract class DataFrameRestTestCase extends ESRestTestCase {
|
||||
stopTransformRequest.addParameter(DataFrameField.FORCE.getPreferredName(), Boolean.toString(force));
|
||||
stopTransformRequest.addParameter(DataFrameField.WAIT_FOR_COMPLETION.getPreferredName(), Boolean.toString(true));
|
||||
Map<String, Object> stopTransformResponse = entityAsMap(client().performRequest(stopTransformRequest));
|
||||
assertThat(stopTransformResponse.get("stopped"), equalTo(Boolean.TRUE));
|
||||
assertThat(stopTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
|
||||
}
|
||||
|
||||
protected void startAndWaitForTransform(String transformId, String dataFrameIndex) throws Exception {
|
||||
|
@ -124,8 +124,8 @@ public class TransportStopDataFrameTransformAction extends
|
||||
}
|
||||
|
||||
// if tasks is empty allMatch is 'vacuously satisfied'
|
||||
boolean allStopped = tasks.stream().allMatch(StopDataFrameTransformAction.Response::isStopped);
|
||||
return new StopDataFrameTransformAction.Response(allStopped);
|
||||
boolean allAcknowledged = tasks.stream().allMatch(StopDataFrameTransformAction.Response::isAcknowledged);
|
||||
return new StopDataFrameTransformAction.Response(allAcknowledged);
|
||||
}
|
||||
|
||||
private ActionListener<StopDataFrameTransformAction.Response>
|
||||
|
@ -42,7 +42,7 @@ teardown:
|
||||
- do:
|
||||
data_frame.start_data_frame_transform:
|
||||
transform_id: "airline-transform-start-stop"
|
||||
- match: { started: true }
|
||||
- match: { acknowledged: true }
|
||||
|
||||
---
|
||||
"Test start missing transform":
|
||||
@ -56,7 +56,7 @@ teardown:
|
||||
- do:
|
||||
data_frame.start_data_frame_transform:
|
||||
transform_id: "airline-transform-start-stop"
|
||||
- match: { started: true }
|
||||
- match: { acknowledged: true }
|
||||
|
||||
- do:
|
||||
catch: /Unable to start data frame transform \[airline-transform-start-stop\] as it is in state \[STARTED\]/
|
||||
@ -68,7 +68,7 @@ teardown:
|
||||
- do:
|
||||
data_frame.start_data_frame_transform:
|
||||
transform_id: "airline-transform-start-stop"
|
||||
- match: { started: true }
|
||||
- match: { acknowledged: true }
|
||||
- do:
|
||||
indices.get_mapping:
|
||||
index: airline-data-by-airline-start-stop
|
||||
@ -83,7 +83,7 @@ teardown:
|
||||
- do:
|
||||
data_frame.start_data_frame_transform:
|
||||
transform_id: "airline-transform-start-stop"
|
||||
- match: { started: true }
|
||||
- match: { acknowledged: true }
|
||||
- do:
|
||||
indices.get_mapping:
|
||||
index: airline-data-by-airline-start-stop
|
||||
@ -93,7 +93,7 @@ teardown:
|
||||
- do:
|
||||
data_frame.start_data_frame_transform:
|
||||
transform_id: "airline-transform-start-stop"
|
||||
- match: { started: true }
|
||||
- match: { acknowledged: true }
|
||||
|
||||
- do:
|
||||
data_frame.get_data_frame_transform_stats:
|
||||
@ -107,7 +107,7 @@ teardown:
|
||||
data_frame.stop_data_frame_transform:
|
||||
transform_id: "airline-transform-start-stop"
|
||||
wait_for_completion: true
|
||||
- match: { stopped: true }
|
||||
- match: { acknowledged: true }
|
||||
|
||||
- do:
|
||||
data_frame.get_data_frame_transform_stats:
|
||||
@ -120,7 +120,7 @@ teardown:
|
||||
- do:
|
||||
data_frame.start_data_frame_transform:
|
||||
transform_id: "airline-transform-start-stop"
|
||||
- match: { started: true }
|
||||
- match: { acknowledged: true }
|
||||
|
||||
- do:
|
||||
data_frame.get_data_frame_transform_stats:
|
||||
@ -142,7 +142,7 @@ teardown:
|
||||
- do:
|
||||
data_frame.stop_data_frame_transform:
|
||||
transform_id: "airline-transform-start-stop"
|
||||
- match: { stopped: true }
|
||||
- match: { acknowledged: true }
|
||||
|
||||
---
|
||||
"Test start/stop only starts/stops specified transform":
|
||||
@ -161,7 +161,7 @@ teardown:
|
||||
- do:
|
||||
data_frame.start_data_frame_transform:
|
||||
transform_id: "airline-transform-start-stop"
|
||||
- match: { started: true }
|
||||
- match: { acknowledged: true }
|
||||
|
||||
- do:
|
||||
data_frame.get_data_frame_transform_stats:
|
||||
@ -182,12 +182,12 @@ teardown:
|
||||
- do:
|
||||
data_frame.start_data_frame_transform:
|
||||
transform_id: "airline-transform-start-later"
|
||||
- match: { started: true }
|
||||
- match: { acknowledged: true }
|
||||
|
||||
- do:
|
||||
data_frame.stop_data_frame_transform:
|
||||
transform_id: "airline-transform-start-stop"
|
||||
- match: { stopped: true }
|
||||
- match: { acknowledged: true }
|
||||
|
||||
- do:
|
||||
data_frame.get_data_frame_transform_stats:
|
||||
@ -201,7 +201,7 @@ teardown:
|
||||
data_frame.stop_data_frame_transform:
|
||||
transform_id: "airline-transform-start-later"
|
||||
wait_for_completion: true
|
||||
- match: { stopped: true }
|
||||
- match: { acknowledged: true }
|
||||
|
||||
- do:
|
||||
data_frame.delete_data_frame_transform:
|
||||
|
Loading…
x
Reference in New Issue
Block a user