[ML] Return assigned node in start/open job/datafeed response (#55570)

Adds a "node" field to the response from the following endpoints:

1. Open anomaly detection job
2. Start datafeed
3. Start data frame analytics job

If the job or datafeed is assigned to a node immediately then
this field will return the ID of that node.

In the case where a job or datafeed is opened or started lazily
the node field will contain an empty string.  Clients that want
to test whether a job or datafeed was opened or started lazily
can therefore check for this.

Backport of #55473
This commit is contained in:
David Roberts 2020-04-22 12:06:53 +01:00 committed by GitHub
parent e99ef3542c
commit da5aeb8be7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
35 changed files with 528 additions and 120 deletions

View File

@ -108,6 +108,7 @@ import org.elasticsearch.client.ml.RevertModelSnapshotRequest;
import org.elasticsearch.client.ml.RevertModelSnapshotResponse; import org.elasticsearch.client.ml.RevertModelSnapshotResponse;
import org.elasticsearch.client.ml.SetUpgradeModeRequest; import org.elasticsearch.client.ml.SetUpgradeModeRequest;
import org.elasticsearch.client.ml.StartDataFrameAnalyticsRequest; import org.elasticsearch.client.ml.StartDataFrameAnalyticsRequest;
import org.elasticsearch.client.ml.StartDataFrameAnalyticsResponse;
import org.elasticsearch.client.ml.StartDatafeedRequest; import org.elasticsearch.client.ml.StartDatafeedRequest;
import org.elasticsearch.client.ml.StartDatafeedResponse; import org.elasticsearch.client.ml.StartDatafeedResponse;
import org.elasticsearch.client.ml.StopDataFrameAnalyticsRequest; import org.elasticsearch.client.ml.StopDataFrameAnalyticsRequest;
@ -2138,12 +2139,12 @@ public final class MachineLearningClient {
* @return action acknowledgement * @return action acknowledgement
* @throws IOException when there is a serialization issue sending the request or receiving the response * @throws IOException when there is a serialization issue sending the request or receiving the response
*/ */
public AcknowledgedResponse startDataFrameAnalytics(StartDataFrameAnalyticsRequest request, public StartDataFrameAnalyticsResponse startDataFrameAnalytics(StartDataFrameAnalyticsRequest request,
RequestOptions options) throws IOException { RequestOptions options) throws IOException {
return restHighLevelClient.performRequestAndParseEntity(request, return restHighLevelClient.performRequestAndParseEntity(request,
MLRequestConverters::startDataFrameAnalytics, MLRequestConverters::startDataFrameAnalytics,
options, options,
AcknowledgedResponse::fromXContent, StartDataFrameAnalyticsResponse::fromXContent,
Collections.emptySet()); Collections.emptySet());
} }
@ -2160,11 +2161,11 @@ public final class MachineLearningClient {
* @return cancellable that may be used to cancel the request * @return cancellable that may be used to cancel the request
*/ */
public Cancellable startDataFrameAnalyticsAsync(StartDataFrameAnalyticsRequest request, RequestOptions options, public Cancellable startDataFrameAnalyticsAsync(StartDataFrameAnalyticsRequest request, RequestOptions options,
ActionListener<AcknowledgedResponse> listener) { ActionListener<StartDataFrameAnalyticsResponse> listener) {
return restHighLevelClient.performRequestAsyncAndParseEntity(request, return restHighLevelClient.performRequestAsyncAndParseEntity(request,
MLRequestConverters::startDataFrameAnalytics, MLRequestConverters::startDataFrameAnalytics,
options, options,
AcknowledgedResponse::fromXContent, StartDataFrameAnalyticsResponse::fromXContent,
listener, listener,
Collections.emptySet()); Collections.emptySet());
} }

View File

@ -33,18 +33,23 @@ import java.util.Objects;
public class OpenJobResponse implements ToXContentObject { public class OpenJobResponse implements ToXContentObject {
private static final ParseField OPENED = new ParseField("opened"); private static final ParseField OPENED = new ParseField("opened");
private static final ParseField NODE = new ParseField("node");
public static final ConstructingObjectParser<OpenJobResponse, Void> PARSER = public static final ConstructingObjectParser<OpenJobResponse, Void> PARSER =
new ConstructingObjectParser<>("open_job_response", true, (a) -> new OpenJobResponse((Boolean)a[0])); new ConstructingObjectParser<>("open_job_response", true,
(a) -> new OpenJobResponse((Boolean) a[0], (String) a[1]));
static { static {
PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), OPENED); PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), OPENED);
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), NODE);
} }
private boolean opened; private final boolean opened;
private final String node;
OpenJobResponse(boolean opened) { OpenJobResponse(boolean opened, String node) {
this.opened = opened; this.opened = opened;
this.node = node;
} }
public static OpenJobResponse fromXContent(XContentParser parser) throws IOException { public static OpenJobResponse fromXContent(XContentParser parser) throws IOException {
@ -60,6 +65,18 @@ public class OpenJobResponse implements ToXContentObject {
return opened; return opened;
} }
/**
* The node that the job was assigned to
*
* @return The ID of a node if the job was assigned to a node. If an empty string is returned
* it means the job was allowed to open lazily and has not yet been assigned to a node.
* If <code>null</code> is returned it means the server version is too old to return node
* information.
*/
public String getNode() {
return node;
}
@Override @Override
public boolean equals(Object other) { public boolean equals(Object other) {
if (this == other) { if (this == other) {
@ -71,18 +88,22 @@ public class OpenJobResponse implements ToXContentObject {
} }
OpenJobResponse that = (OpenJobResponse) other; OpenJobResponse that = (OpenJobResponse) other;
return isOpened() == that.isOpened(); return opened == that.opened
&& Objects.equals(node, that.node);
} }
@Override @Override
public int hashCode() { public int hashCode() {
return Objects.hash(isOpened()); return Objects.hash(opened, node);
} }
@Override @Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(); builder.startObject();
builder.field(OPENED.getPreferredName(), opened); builder.field(OPENED.getPreferredName(), opened);
if (node != null) {
builder.field(NODE.getPreferredName(), node);
}
builder.endObject(); builder.endObject();
return builder; return builder;
} }

View File

@ -0,0 +1,97 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.ml;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
import java.util.Objects;
/**
* Response indicating if the Machine Learning Datafeed is now started or not
*/
public class StartDataFrameAnalyticsResponse extends AcknowledgedResponse {
private static final ParseField NODE = new ParseField("node");
public static final ConstructingObjectParser<StartDataFrameAnalyticsResponse, Void> PARSER =
new ConstructingObjectParser<>(
"start_data_frame_analytics_response",
true,
(a) -> new StartDataFrameAnalyticsResponse((Boolean) a[0], (String) a[1]));
static {
declareAcknowledgedField(PARSER);
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), NODE);
}
private final String node;
public StartDataFrameAnalyticsResponse(boolean acknowledged, String node) {
super(acknowledged);
this.node = node;
}
public static StartDataFrameAnalyticsResponse fromXContent(XContentParser parser) throws IOException {
return PARSER.parse(parser, null);
}
/**
* The node that the job was assigned to
*
* @return The ID of a node if the job was assigned to a node. If an empty string is returned
* it means the job was allowed to open lazily and has not yet been assigned to a node.
* If <code>null</code> is returned it means the server version is too old to return node
* information.
*/
public String getNode() {
return node;
}
@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (other == null || getClass() != other.getClass()) {
return false;
}
StartDataFrameAnalyticsResponse that = (StartDataFrameAnalyticsResponse) other;
return isAcknowledged() == that.isAcknowledged()
&& Objects.equals(node, that.node);
}
@Override
public int hashCode() {
return Objects.hash(isAcknowledged(), node);
}
@Override
public void addCustomFields(XContentBuilder builder, Params params) throws IOException {
if (node != null) {
builder.field(NODE.getPreferredName(), node);
}
}
}

View File

@ -33,21 +33,25 @@ import java.util.Objects;
public class StartDatafeedResponse implements ToXContentObject { public class StartDatafeedResponse implements ToXContentObject {
private static final ParseField STARTED = new ParseField("started"); private static final ParseField STARTED = new ParseField("started");
private static final ParseField NODE = new ParseField("node");
public static final ConstructingObjectParser<StartDatafeedResponse, Void> PARSER = public static final ConstructingObjectParser<StartDatafeedResponse, Void> PARSER =
new ConstructingObjectParser<>( new ConstructingObjectParser<>(
"start_datafeed_response", "start_datafeed_response",
true, true,
(a) -> new StartDatafeedResponse((Boolean)a[0])); (a) -> new StartDatafeedResponse((Boolean) a[0], (String) a[1]));
static { static {
PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), STARTED); PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), STARTED);
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), NODE);
} }
private final boolean started; private final boolean started;
private final String node;
public StartDatafeedResponse(boolean started) { public StartDatafeedResponse(boolean started, String node) {
this.started = started; this.started = started;
this.node = node;
} }
public static StartDatafeedResponse fromXContent(XContentParser parser) throws IOException { public static StartDatafeedResponse fromXContent(XContentParser parser) throws IOException {
@ -63,6 +67,18 @@ public class StartDatafeedResponse implements ToXContentObject {
return started; return started;
} }
/**
* The node that the datafeed was assigned to
*
* @return The ID of a node if the datafeed was assigned to a node. If an empty string is returned
* it means the datafeed was allowed to open lazily and has not yet been assigned to a node.
* If <code>null</code> is returned it means the server version is too old to return node
* information.
*/
public String getNode() {
return node;
}
@Override @Override
public boolean equals(Object other) { public boolean equals(Object other) {
if (this == other) { if (this == other) {
@ -74,18 +90,22 @@ public class StartDatafeedResponse implements ToXContentObject {
} }
StartDatafeedResponse that = (StartDatafeedResponse) other; StartDatafeedResponse that = (StartDatafeedResponse) other;
return isStarted() == that.isStarted(); return started == started
&& Objects.equals(node, that.node);
} }
@Override @Override
public int hashCode() { public int hashCode() {
return Objects.hash(isStarted()); return Objects.hash(started, node);
} }
@Override @Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(); builder.startObject();
builder.field(STARTED.getPreferredName(), started); builder.field(STARTED.getPreferredName(), started);
if (node != null) {
builder.field(NODE.getPreferredName(), node);
}
builder.endObject(); builder.endObject();
return builder; return builder;
} }

View File

@ -122,6 +122,7 @@ import org.elasticsearch.client.ml.RevertModelSnapshotRequest;
import org.elasticsearch.client.ml.RevertModelSnapshotResponse; import org.elasticsearch.client.ml.RevertModelSnapshotResponse;
import org.elasticsearch.client.ml.SetUpgradeModeRequest; import org.elasticsearch.client.ml.SetUpgradeModeRequest;
import org.elasticsearch.client.ml.StartDataFrameAnalyticsRequest; import org.elasticsearch.client.ml.StartDataFrameAnalyticsRequest;
import org.elasticsearch.client.ml.StartDataFrameAnalyticsResponse;
import org.elasticsearch.client.ml.StartDatafeedRequest; import org.elasticsearch.client.ml.StartDatafeedRequest;
import org.elasticsearch.client.ml.StartDatafeedResponse; import org.elasticsearch.client.ml.StartDatafeedResponse;
import org.elasticsearch.client.ml.StopDataFrameAnalyticsRequest; import org.elasticsearch.client.ml.StopDataFrameAnalyticsRequest;
@ -232,6 +233,7 @@ import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.core.Is.is; import static org.hamcrest.core.Is.is;
public class MlClientDocumentationIT extends ESRestHighLevelClientTestCase { public class MlClientDocumentationIT extends ESRestHighLevelClientTestCase {
@ -461,7 +463,10 @@ public class MlClientDocumentationIT extends ESRestHighLevelClientTestCase {
// tag::open-job-response // tag::open-job-response
boolean isOpened = openJobResponse.isOpened(); // <1> boolean isOpened = openJobResponse.isOpened(); // <1>
String node = openJobResponse.getNode(); // <2>
// end::open-job-response // end::open-job-response
assertThat(node, notNullValue());
} }
{ {
// tag::open-job-execute-listener // tag::open-job-execute-listener
@ -1011,11 +1016,14 @@ public class MlClientDocumentationIT extends ESRestHighLevelClientTestCase {
// tag::start-datafeed-execute // tag::start-datafeed-execute
StartDatafeedResponse response = client.machineLearning().startDatafeed(request, RequestOptions.DEFAULT); StartDatafeedResponse response = client.machineLearning().startDatafeed(request, RequestOptions.DEFAULT);
// end::start-datafeed-execute // end::start-datafeed-execute
// tag::start-datafeed-response // tag::start-datafeed-response
boolean started = response.isStarted(); // <1> boolean started = response.isStarted(); // <1>
String node = response.getNode(); // <2>
// end::start-datafeed-response // end::start-datafeed-response
assertTrue(started); assertTrue(started);
assertThat(node, notNullValue());
} }
{ {
StartDatafeedRequest request = new StartDatafeedRequest(datafeedId); StartDatafeedRequest request = new StartDatafeedRequest(datafeedId);
@ -3131,14 +3139,16 @@ public class MlClientDocumentationIT extends ESRestHighLevelClientTestCase {
// end::start-data-frame-analytics-request // end::start-data-frame-analytics-request
// tag::start-data-frame-analytics-execute // tag::start-data-frame-analytics-execute
AcknowledgedResponse response = client.machineLearning().startDataFrameAnalytics(request, RequestOptions.DEFAULT); StartDataFrameAnalyticsResponse response = client.machineLearning().startDataFrameAnalytics(request, RequestOptions.DEFAULT);
// end::start-data-frame-analytics-execute // end::start-data-frame-analytics-execute
// tag::start-data-frame-analytics-response // tag::start-data-frame-analytics-response
boolean acknowledged = response.isAcknowledged(); boolean acknowledged = response.isAcknowledged();
String node = response.getNode(); // <1>
// end::start-data-frame-analytics-response // end::start-data-frame-analytics-response
assertThat(acknowledged, is(true)); assertThat(acknowledged, is(true));
assertThat(node, notNullValue());
} }
assertBusy( assertBusy(
() -> assertThat(getAnalyticsState(DF_ANALYTICS_CONFIG.getId()), equalTo(DataFrameAnalyticsState.STOPPED)), () -> assertThat(getAnalyticsState(DF_ANALYTICS_CONFIG.getId()), equalTo(DataFrameAnalyticsState.STOPPED)),
@ -3147,9 +3157,9 @@ public class MlClientDocumentationIT extends ESRestHighLevelClientTestCase {
StartDataFrameAnalyticsRequest request = new StartDataFrameAnalyticsRequest("my-analytics-config"); StartDataFrameAnalyticsRequest request = new StartDataFrameAnalyticsRequest("my-analytics-config");
// tag::start-data-frame-analytics-execute-listener // tag::start-data-frame-analytics-execute-listener
ActionListener<AcknowledgedResponse> listener = new ActionListener<AcknowledgedResponse>() { ActionListener<StartDataFrameAnalyticsResponse> listener = new ActionListener<StartDataFrameAnalyticsResponse>() {
@Override @Override
public void onResponse(AcknowledgedResponse response) { public void onResponse(StartDataFrameAnalyticsResponse response) {
// <1> // <1>
} }

View File

@ -27,7 +27,8 @@ public class OpenJobResponseTests extends AbstractXContentTestCase<OpenJobRespon
@Override @Override
protected OpenJobResponse createTestInstance() { protected OpenJobResponse createTestInstance() {
return new OpenJobResponse(randomBoolean()); String node = randomFrom("", randomAlphaOfLength(10), null);
return new OpenJobResponse(randomBoolean(), node);
} }
@Override @Override
@ -37,6 +38,6 @@ public class OpenJobResponseTests extends AbstractXContentTestCase<OpenJobRespon
@Override @Override
protected boolean supportsUnknownFields() { protected boolean supportsUnknownFields() {
return false; return true;
} }
} }

View File

@ -0,0 +1,43 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.ml;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractXContentTestCase;
import java.io.IOException;
public class StartDataFrameAnalyticsResponseTests extends AbstractXContentTestCase<StartDataFrameAnalyticsResponse> {
@Override
protected StartDataFrameAnalyticsResponse createTestInstance() {
String node = randomFrom("", randomAlphaOfLength(10), null);
return new StartDataFrameAnalyticsResponse(randomBoolean(), node);
}
@Override
protected StartDataFrameAnalyticsResponse doParseInstance(XContentParser parser) throws IOException {
return StartDataFrameAnalyticsResponse.fromXContent(parser);
}
@Override
protected boolean supportsUnknownFields() {
return true;
}
}

View File

@ -27,7 +27,8 @@ public class StartDatafeedResponseTests extends AbstractXContentTestCase<StartDa
@Override @Override
protected StartDatafeedResponse createTestInstance() { protected StartDatafeedResponse createTestInstance() {
return new StartDatafeedResponse(randomBoolean()); String node = randomFrom("", randomAlphaOfLength(10), null);
return new StartDatafeedResponse(randomBoolean(), node);
} }
@Override @Override

View File

@ -30,7 +30,12 @@ execution should wait for the job to be opened.
-------------------------------------------------- --------------------------------------------------
include-tagged::{doc-tests-file}[{api}-response] include-tagged::{doc-tests-file}[{api}-response]
-------------------------------------------------- --------------------------------------------------
<1> `isOpened()` from the +{response}+ indicates if the job was successfully <1> `isOpened()` from the +{response}+ is always `true` if the job was
opened or not. opened successfully. (An exception would be thrown instead if the job
was not opened successfully.)
<2> `getNode()` returns the node that the job was assigned to. If the
job is allowed to open lazily and has not yet been assigned to a node
then an empty string is returned. If `getNode()` returns `null` then
the server is an old version that does not return node information.
include::../execution.asciidoc[] include::../execution.asciidoc[]

View File

@ -24,6 +24,15 @@ include-tagged::{doc-tests-file}[{api}-request]
include::../execution.asciidoc[] include::../execution.asciidoc[]
[id="{upid}-{api}-response"] [id="{upid}-{api}-response"]
==== Response ==== Start {dfanalytics-job} response
The returned +{response}+ object acknowledges the {dfanalytics-job} has started. The returned +{response}+ object acknowledges the {dfanalytics-job} has started.
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests-file}[{api}-response]
--------------------------------------------------
<1> `getNode()` returns the node that the job was assigned to. If the
job is allowed to open lazily and has not yet been assigned to a node
then an empty string is returned. If `getNode()` returns `null` then
the server is an old version that does not return node information.

View File

@ -5,13 +5,13 @@
-- --
[role="xpack"] [role="xpack"]
[id="{upid}-{api}"] [id="{upid}-{api}"]
=== Start datafeed API === Start {dfeed} API
Starts a {ml} datafeed in the cluster. It accepts a +{request}+ object and Starts a {ml} {dfeed} in the cluster. It accepts a +{request}+ object and
responds with a +{response}+ object. responds with a +{response}+ object.
[id="{upid}-{api}-request"] [id="{upid}-{api}-request"]
==== Start datafeed request ==== Start {dfeed} request
A +{request}+ object is created referencing a non-null `datafeedId`. A +{request}+ object is created referencing a non-null `datafeedId`.
All other fields are optional for the request. All other fields are optional for the request.
@ -30,14 +30,29 @@ The following arguments are optional.
-------------------------------------------------- --------------------------------------------------
include-tagged::{doc-tests-file}[{api}-request-options] include-tagged::{doc-tests-file}[{api}-request-options]
-------------------------------------------------- --------------------------------------------------
<1> Set when the datafeed should end, the value is exclusive. <1> Set when the {dfeed} should end, the value is exclusive.
May be an epoch seconds, epoch millis or an ISO 8601 string. May be an epoch seconds, epoch millis or an ISO 8601 string.
"now" is a special value that indicates the current time. "now" is a special value that indicates the current time.
If you do not specify an end time, the datafeed runs continuously. If you do not specify an end time, the {dfeed} runs continuously.
<2> Set when the datafeed should start, the value is inclusive. <2> Set when the {dfeed} should start, the value is inclusive.
May be an epoch seconds, epoch millis or an ISO 8601 string. May be an epoch seconds, epoch millis or an ISO 8601 string.
If you do not specify a start time and the datafeed is associated with a new job, If you do not specify a start time and the {dfeed} is associated with a new job,
the analysis starts from the earliest time for which data is available. the analysis starts from the earliest time for which data is available.
<3> Set the timeout for the request <3> Set the timeout for the request
[id="{upid}-{api}-response"]
==== Start {dfeed} response
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests-file}[{api}-response]
--------------------------------------------------
<1> `isStarted()` from the +{response}+ is always `true` if the {dfeed} was
started successfully. (An exception would be thrown instead if the {dfeed}
was not started successfully.)
<2> `getNode()` returns the node that the {dfeed} was assigned to. If the
{dfeed} is allowed to open lazily and has not yet been assigned to a node
then an empty string is returned. If `getNode()` returns `null` then
the server is an old version that does not return node information.
include::../execution.asciidoc[] include::../execution.asciidoc[]

View File

@ -47,6 +47,17 @@ include::{docdir}/ml/ml-shared.asciidoc[tag=job-id-anomaly-detection]
(Optional, time) Controls the time to wait until a job has opened. The default (Optional, time) Controls the time to wait until a job has opened. The default
value is 30 minutes. value is 30 minutes.
[[ml-open-job-response-body]]
==== {api-response-body-title}
`node`::
(string) The ID of the node that the job was opened on. If the job is allowed to
open lazily and has not yet been assigned to a node, this value is an empty string.
`opened`::
(boolean) For a successful response, this value is always `true`. On failure, an
exception is returned instead.
[[ml-open-job-example]] [[ml-open-job-example]]
==== {api-examples-title} ==== {api-examples-title}
@ -64,6 +75,7 @@ When the job opens, you receive the following results:
[source,console-result] [source,console-result]
---- ----
{ {
"opened": true "opened" : true,
"node" : "node-1"
} }
---- ----

View File

@ -92,6 +92,18 @@ include::{docdir}/ml/ml-shared.asciidoc[tag=datafeed-id]
(Optional, time) Controls the amount of time to wait until a {dfeed} starts. (Optional, time) Controls the amount of time to wait until a {dfeed} starts.
The default value is 20 seconds. The default value is 20 seconds.
[[ml-start-datafeed-response-body]]
==== {api-response-body-title}
`node`::
(string) The ID of the node that the {dfeed} was started on.
If the {dfeed} is allowed to open lazily and has not yet been
assigned to a node, this value is an empty string.
`started`::
(boolean) For a successful response, this value is always `true`. On failure, an
exception is returned instead.
[[ml-start-datafeed-example]] [[ml-start-datafeed-example]]
==== {api-examples-title} ==== {api-examples-title}
@ -109,6 +121,7 @@ When the {dfeed} starts, you receive the following results:
[source,console-result] [source,console-result]
---- ----
{ {
"started": true "started" : true,
"node" : "node-1"
} }
---- ----

View File

@ -62,6 +62,17 @@ include::{docdir}/ml/ml-shared.asciidoc[tag=job-id-data-frame-analytics-define]
(Optional, <<time-units,time units>>) (Optional, <<time-units,time units>>)
include::{docdir}/ml/ml-shared.asciidoc[tag=timeout-start] include::{docdir}/ml/ml-shared.asciidoc[tag=timeout-start]
[[ml-start-dfanalytics-response-body]]
==== {api-response-body-title}
`acknowledged`::
(boolean) For a successful response, this value is always `true`. On failure, an
exception is returned instead.
`node`::
(string) The ID of the node that the job was started on.
If the job is allowed to open lazily and has not yet been assigned to a node, this value is an empty string.
[[ml-start-dfanalytics-example]] [[ml-start-dfanalytics-example]]
==== {api-examples-title} ==== {api-examples-title}
@ -78,6 +89,7 @@ When the {dfanalytics-job} starts, you receive the following results:
[source,console-result] [source,console-result]
---- ----
{ {
"acknowledged" : true "acknowledged" : true,
"node" : "node-1"
} }
---- ----

View File

@ -0,0 +1,72 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.ml.action;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.Objects;
public class NodeAcknowledgedResponse extends AcknowledgedResponse {
public static final String NODE_FIELD = "node";
private final String node;
public NodeAcknowledgedResponse(boolean acknowledged, String node) {
super(acknowledged);
this.node = Objects.requireNonNull(node);
}
public NodeAcknowledgedResponse(StreamInput in) throws IOException {
super(in);
if (in.getVersion().onOrAfter(Version.V_7_8_0)) {
node = in.readString();
} else {
node = "";
}
}
public String getNode() {
return node;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
if (out.getVersion().onOrAfter(Version.V_7_8_0)) {
out.writeString(node);
}
}
@Override
protected void addCustomFields(XContentBuilder builder, Params params) throws IOException {
builder.field(NODE_FIELD, node);
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
NodeAcknowledgedResponse that = (NodeAcknowledgedResponse) o;
return isAcknowledged() == that.isAcknowledged()
&& Objects.equals(node, that.node);
}
@Override
public int hashCode() {
return Objects.hash(isAcknowledged(), node);
}
}

View File

@ -9,7 +9,6 @@ import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionType; import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.MasterNodeRequest; import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.Metadata;
@ -34,13 +33,13 @@ import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import java.io.IOException; import java.io.IOException;
import java.util.Objects; import java.util.Objects;
public class OpenJobAction extends ActionType<AcknowledgedResponse> { public class OpenJobAction extends ActionType<NodeAcknowledgedResponse> {
public static final OpenJobAction INSTANCE = new OpenJobAction(); public static final OpenJobAction INSTANCE = new OpenJobAction();
public static final String NAME = "cluster:admin/xpack/ml/job/open"; public static final String NAME = "cluster:admin/xpack/ml/job/open";
private OpenJobAction() { private OpenJobAction() {
super(NAME, AcknowledgedResponse::new); super(NAME, NodeAcknowledgedResponse::new);
} }
public static class Request extends MasterNodeRequest<Request> implements ToXContentObject { public static class Request extends MasterNodeRequest<Request> implements ToXContentObject {
@ -261,7 +260,7 @@ public class OpenJobAction extends ActionType<AcknowledgedResponse> {
} }
} }
static class RequestBuilder extends ActionRequestBuilder<Request, AcknowledgedResponse> { static class RequestBuilder extends ActionRequestBuilder<Request, NodeAcknowledgedResponse> {
RequestBuilder(ElasticsearchClient client, OpenJobAction action) { RequestBuilder(ElasticsearchClient client, OpenJobAction action) {
super(client, action, new Request()); super(client, action, new Request());

View File

@ -9,7 +9,6 @@ import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionType; import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.MasterNodeRequest; import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.Metadata;
@ -38,7 +37,7 @@ import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
public class StartDataFrameAnalyticsAction extends ActionType<AcknowledgedResponse> { public class StartDataFrameAnalyticsAction extends ActionType<NodeAcknowledgedResponse> {
public static final StartDataFrameAnalyticsAction INSTANCE = new StartDataFrameAnalyticsAction(); public static final StartDataFrameAnalyticsAction INSTANCE = new StartDataFrameAnalyticsAction();
public static final String NAME = "cluster:admin/xpack/ml/data_frame/analytics/start"; public static final String NAME = "cluster:admin/xpack/ml/data_frame/analytics/start";
@ -46,7 +45,7 @@ public class StartDataFrameAnalyticsAction extends ActionType<AcknowledgedRespon
public static final TimeValue DEFAULT_TIMEOUT = new TimeValue(20, TimeUnit.SECONDS); public static final TimeValue DEFAULT_TIMEOUT = new TimeValue(20, TimeUnit.SECONDS);
private StartDataFrameAnalyticsAction() { private StartDataFrameAnalyticsAction() {
super(NAME, AcknowledgedResponse::new); super(NAME, NodeAcknowledgedResponse::new);
} }
public static class Request extends MasterNodeRequest<Request> implements ToXContentObject { public static class Request extends MasterNodeRequest<Request> implements ToXContentObject {
@ -146,7 +145,7 @@ public class StartDataFrameAnalyticsAction extends ActionType<AcknowledgedRespon
} }
} }
static class RequestBuilder extends ActionRequestBuilder<Request, AcknowledgedResponse> { static class RequestBuilder extends ActionRequestBuilder<Request, NodeAcknowledgedResponse> {
RequestBuilder(ElasticsearchClient client, StartDataFrameAnalyticsAction action) { RequestBuilder(ElasticsearchClient client, StartDataFrameAnalyticsAction action) {
super(client, action, new Request()); super(client, action, new Request());

View File

@ -13,7 +13,6 @@ import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.ValidateActions; import org.elasticsearch.action.ValidateActions;
import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.MasterNodeRequest; import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.ParseField; import org.elasticsearch.common.ParseField;
@ -40,7 +39,7 @@ import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.function.LongSupplier; import java.util.function.LongSupplier;
public class StartDatafeedAction extends ActionType<AcknowledgedResponse> { public class StartDatafeedAction extends ActionType<NodeAcknowledgedResponse> {
public static final ParseField START_TIME = new ParseField("start"); public static final ParseField START_TIME = new ParseField("start");
public static final ParseField END_TIME = new ParseField("end"); public static final ParseField END_TIME = new ParseField("end");
@ -50,7 +49,7 @@ public class StartDatafeedAction extends ActionType<AcknowledgedResponse> {
public static final String NAME = "cluster:admin/xpack/ml/datafeed/start"; public static final String NAME = "cluster:admin/xpack/ml/datafeed/start";
private StartDatafeedAction() { private StartDatafeedAction() {
super(NAME, AcknowledgedResponse::new); super(NAME, NodeAcknowledgedResponse::new);
} }
public static class Request extends MasterNodeRequest<Request> implements ToXContentObject { public static class Request extends MasterNodeRequest<Request> implements ToXContentObject {
@ -343,7 +342,7 @@ public class StartDatafeedAction extends ActionType<AcknowledgedResponse> {
} }
} }
static class RequestBuilder extends ActionRequestBuilder<Request, AcknowledgedResponse> { static class RequestBuilder extends ActionRequestBuilder<Request, NodeAcknowledgedResponse> {
RequestBuilder(ElasticsearchClient client, StartDatafeedAction action) { RequestBuilder(ElasticsearchClient client, StartDatafeedAction action) {
super(client, action, new Request()); super(client, action, new Request());

View File

@ -26,6 +26,7 @@ import org.elasticsearch.xpack.core.ml.action.GetJobsAction;
import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction; import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction;
import org.elasticsearch.xpack.core.ml.action.GetModelSnapshotsAction; import org.elasticsearch.xpack.core.ml.action.GetModelSnapshotsAction;
import org.elasticsearch.xpack.core.ml.action.GetRecordsAction; import org.elasticsearch.xpack.core.ml.action.GetRecordsAction;
import org.elasticsearch.xpack.core.ml.action.NodeAcknowledgedResponse;
import org.elasticsearch.xpack.core.ml.action.OpenJobAction; import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
import org.elasticsearch.xpack.core.ml.action.PostDataAction; import org.elasticsearch.xpack.core.ml.action.PostDataAction;
import org.elasticsearch.xpack.core.ml.action.PutDatafeedAction; import org.elasticsearch.xpack.core.ml.action.PutDatafeedAction;
@ -234,12 +235,12 @@ public class MachineLearningClient {
} }
public void openJob(OpenJobAction.Request request, public void openJob(OpenJobAction.Request request,
ActionListener<AcknowledgedResponse> listener) { ActionListener<NodeAcknowledgedResponse> listener) {
client.execute(OpenJobAction.INSTANCE, request, listener); client.execute(OpenJobAction.INSTANCE, request, listener);
} }
public ActionFuture<AcknowledgedResponse> openJob(OpenJobAction.Request request) { public ActionFuture<NodeAcknowledgedResponse> openJob(OpenJobAction.Request request) {
PlainActionFuture<AcknowledgedResponse> listener = PlainActionFuture.newFuture(); PlainActionFuture<NodeAcknowledgedResponse> listener = PlainActionFuture.newFuture();
client.execute(OpenJobAction.INSTANCE, request, listener); client.execute(OpenJobAction.INSTANCE, request, listener);
return listener; return listener;
} }
@ -301,13 +302,13 @@ public class MachineLearningClient {
} }
public void startDatafeed(StartDatafeedAction.Request request, public void startDatafeed(StartDatafeedAction.Request request,
ActionListener<AcknowledgedResponse> listener) { ActionListener<NodeAcknowledgedResponse> listener) {
client.execute(StartDatafeedAction.INSTANCE, request, listener); client.execute(StartDatafeedAction.INSTANCE, request, listener);
} }
public ActionFuture<AcknowledgedResponse> startDatafeed( public ActionFuture<NodeAcknowledgedResponse> startDatafeed(
StartDatafeedAction.Request request) { StartDatafeedAction.Request request) {
PlainActionFuture<AcknowledgedResponse> listener = PlainActionFuture.newFuture(); PlainActionFuture<NodeAcknowledgedResponse> listener = PlainActionFuture.newFuture();
client.execute(StartDatafeedAction.INSTANCE, request, listener); client.execute(StartDatafeedAction.INSTANCE, request, listener);
return listener; return listener;
} }

View File

@ -0,0 +1,42 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.ml.action;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.xpack.core.ml.AbstractBWCWireSerializationTestCase;
public class NodeAcknowledgedResponseTests extends AbstractBWCWireSerializationTestCase<NodeAcknowledgedResponse> {
@Override
protected NodeAcknowledgedResponse createTestInstance() {
return new NodeAcknowledgedResponse(true, randomFrom(randomAlphaOfLength(10), ""));
}
@Override
protected Writeable.Reader<NodeAcknowledgedResponse> instanceReader() {
return NodeAcknowledgedResponse::new;
}
@Override
protected NodeAcknowledgedResponse mutateInstance(NodeAcknowledgedResponse instance) {
if (instance.getNode().isEmpty()) {
return new NodeAcknowledgedResponse(true, randomAlphaOfLength(10));
} else {
return new NodeAcknowledgedResponse(true, "");
}
}
@Override
protected NodeAcknowledgedResponse mutateInstanceForVersion(NodeAcknowledgedResponse instance, Version version) {
if (version.onOrAfter(Version.V_7_8_0)) {
return instance;
} else {
return new NodeAcknowledgedResponse(true, "");
}
}
}

View File

@ -24,6 +24,7 @@ import java.util.Map;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
public class MlBasicMultiNodeIT extends ESRestTestCase { public class MlBasicMultiNodeIT extends ESRestTestCase {
@ -55,7 +56,7 @@ public class MlBasicMultiNodeIT extends ESRestTestCase {
Response openResponse = client().performRequest( Response openResponse = client().performRequest(
new Request("POST", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_open")); new Request("POST", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_open"));
assertEquals(Collections.singletonMap("opened", true), entityAsMap(openResponse)); assertThat(entityAsMap(openResponse), hasEntry("opened", true));
Request addData = new Request("POST", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_data"); Request addData = new Request("POST", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_data");
addData.setEntity(new NStringEntity( addData.setEntity(new NStringEntity(
@ -136,12 +137,12 @@ public class MlBasicMultiNodeIT extends ESRestTestCase {
Response openResponse = client().performRequest( Response openResponse = client().performRequest(
new Request("POST", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_open")); new Request("POST", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_open"));
assertEquals(Collections.singletonMap("opened", true), entityAsMap(openResponse)); assertThat(entityAsMap(openResponse), hasEntry("opened", true));
Request startRequest = new Request("POST", MachineLearning.BASE_PATH + "datafeeds/" + datafeedId + "/_start"); Request startRequest = new Request("POST", MachineLearning.BASE_PATH + "datafeeds/" + datafeedId + "/_start");
startRequest.addParameter("start", "0"); startRequest.addParameter("start", "0");
Response startResponse = client().performRequest(startRequest); Response startResponse = client().performRequest(startRequest);
assertEquals(Collections.singletonMap("started", true), entityAsMap(startResponse)); assertThat(entityAsMap(startResponse), hasEntry("started", true));
assertBusy(() -> { assertBusy(() -> {
try { try {
@ -175,7 +176,7 @@ public class MlBasicMultiNodeIT extends ESRestTestCase {
Response openResponse = client().performRequest( Response openResponse = client().performRequest(
new Request("POST", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_open")); new Request("POST", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_open"));
assertEquals(Collections.singletonMap("opened", true), entityAsMap(openResponse)); assertThat(entityAsMap(openResponse), hasEntry("opened", true));
Request addDataRequest = new Request("POST", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_data"); Request addDataRequest = new Request("POST", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_data");
addDataRequest.setEntity(new NStringEntity( addDataRequest.setEntity(new NStringEntity(
@ -214,7 +215,7 @@ public class MlBasicMultiNodeIT extends ESRestTestCase {
Request openRequest = new Request("POST", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_open"); Request openRequest = new Request("POST", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_open");
openRequest.addParameter("timeout", "20s"); openRequest.addParameter("timeout", "20s");
Response openResponse2 = client().performRequest(openRequest); Response openResponse2 = client().performRequest(openRequest);
assertEquals(Collections.singletonMap("opened", true), entityAsMap(openResponse2)); assertThat(entityAsMap(openResponse2), hasEntry("opened", true));
// feed some more data points // feed some more data points
Request addDataRequest2 = new Request("POST", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_data"); Request addDataRequest2 = new Request("POST", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_data");

View File

@ -24,6 +24,7 @@ import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.core.ml.action.EvaluateDataFrameAction; import org.elasticsearch.xpack.core.ml.action.EvaluateDataFrameAction;
import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction; import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction;
import org.elasticsearch.xpack.core.ml.action.NodeAcknowledgedResponse;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
import org.elasticsearch.xpack.core.ml.dataframe.analyses.BoostedTreeParams; import org.elasticsearch.xpack.core.ml.dataframe.analyses.BoostedTreeParams;
@ -46,6 +47,7 @@ import static org.elasticsearch.xpack.core.ml.MlTasks.AWAITING_UPGRADE;
import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.emptyString;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo;
@ -309,7 +311,8 @@ public class ClassificationIT extends MlNativeDataFrameAnalyticsIntegTestCase {
assertIsStopped(jobId); assertIsStopped(jobId);
assertProgress(jobId, 0, 0, 0, 0); assertProgress(jobId, 0, 0, 0, 0);
startAnalytics(jobId); NodeAcknowledgedResponse response = startAnalytics(jobId);
assertThat(response.getNode(), not(emptyString()));
// Wait until state is one of REINDEXING or ANALYZING, or until it is STOPPED. // Wait until state is one of REINDEXING or ANALYZING, or until it is STOPPED.
assertBusy(() -> { assertBusy(() -> {
@ -326,7 +329,8 @@ public class ClassificationIT extends MlNativeDataFrameAnalyticsIntegTestCase {
// Now let's start it again // Now let's start it again
try { try {
startAnalytics(jobId); response = startAnalytics(jobId);
assertThat(response.getNode(), not(emptyString()));
} catch (Exception e) { } catch (Exception e) {
if (e.getMessage().equals("Cannot start because the job has already finished")) { if (e.getMessage().equals("Cannot start because the job has already finished")) {
// That means the job had managed to complete // That means the job had managed to complete

View File

@ -1003,7 +1003,7 @@ public class DatafeedJobsRestIT extends ESRestTestCase {
Request startRequest = new Request("POST", MachineLearning.BASE_PATH + "datafeeds/" + datafeedId + "/_start"); Request startRequest = new Request("POST", MachineLearning.BASE_PATH + "datafeeds/" + datafeedId + "/_start");
startRequest.addParameter("start", "2016-06-01T00:00:00Z"); startRequest.addParameter("start", "2016-06-01T00:00:00Z");
Response response = client().performRequest(startRequest); Response response = client().performRequest(startRequest);
assertThat(EntityUtils.toString(response.getEntity()), equalTo("{\"started\":true}")); assertThat(EntityUtils.toString(response.getEntity()), containsString("\"started\":true"));
assertBusy(() -> { assertBusy(() -> {
try { try {
Response getJobResponse = client().performRequest(new Request("GET", Response getJobResponse = client().performRequest(new Request("GET",
@ -1062,7 +1062,7 @@ public class DatafeedJobsRestIT extends ESRestTestCase {
startRequest.addParameter("start", "2016-06-01T00:00:00Z"); startRequest.addParameter("start", "2016-06-01T00:00:00Z");
Response response = client().performRequest(startRequest); Response response = client().performRequest(startRequest);
assertThat(response.getStatusLine().getStatusCode(), equalTo(200)); assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
assertThat(EntityUtils.toString(response.getEntity()), equalTo("{\"started\":true}")); assertThat(EntityUtils.toString(response.getEntity()), containsString("\"started\":true"));
ResponseException e = expectThrows(ResponseException.class, ResponseException e = expectThrows(ResponseException.class,
() -> client().performRequest(new Request("DELETE", MachineLearning.BASE_PATH + "datafeeds/" + datafeedId))); () -> client().performRequest(new Request("DELETE", MachineLearning.BASE_PATH + "datafeeds/" + datafeedId)));
@ -1154,7 +1154,7 @@ public class DatafeedJobsRestIT extends ESRestTestCase {
options.addHeader("Authorization", authHeader); options.addHeader("Authorization", authHeader);
request.setOptions(options); request.setOptions(options);
Response startDatafeedResponse = client().performRequest(request); Response startDatafeedResponse = client().performRequest(request);
assertThat(EntityUtils.toString(startDatafeedResponse.getEntity()), equalTo("{\"started\":true}")); assertThat(EntityUtils.toString(startDatafeedResponse.getEntity()), containsString("\"started\":true"));
assertBusy(() -> { assertBusy(() -> {
try { try {
Response datafeedStatsResponse = client().performRequest(new Request("GET", Response datafeedStatsResponse = client().performRequest(new Request("GET",

View File

@ -25,7 +25,6 @@ import org.elasticsearch.xpack.ml.MachineLearning;
import org.junit.After; import org.junit.After;
import java.io.IOException; import java.io.IOException;
import java.util.Collections;
import java.util.Locale; import java.util.Locale;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -125,7 +124,7 @@ public class MlJobIT extends ESRestTestCase {
assertEquals(2, XContentMapValues.extractValue("ml.jobs._all.count", usage)); assertEquals(2, XContentMapValues.extractValue("ml.jobs._all.count", usage));
assertEquals(2, XContentMapValues.extractValue("ml.jobs.closed.count", usage)); assertEquals(2, XContentMapValues.extractValue("ml.jobs.closed.count", usage));
Response openResponse = client().performRequest(new Request("POST", MachineLearning.BASE_PATH + "anomaly_detectors/job-1/_open")); Response openResponse = client().performRequest(new Request("POST", MachineLearning.BASE_PATH + "anomaly_detectors/job-1/_open"));
assertEquals(Collections.singletonMap("opened", true), entityAsMap(openResponse)); assertThat(entityAsMap(openResponse), hasEntry("opened", true));
usage = entityAsMap(client().performRequest(new Request("GET", "_xpack/usage"))); usage = entityAsMap(client().performRequest(new Request("GET", "_xpack/usage")));
assertEquals(2, XContentMapValues.extractValue("ml.jobs._all.count", usage)); assertEquals(2, XContentMapValues.extractValue("ml.jobs._all.count", usage));
assertEquals(1, XContentMapValues.extractValue("ml.jobs.closed.count", usage)); assertEquals(1, XContentMapValues.extractValue("ml.jobs.closed.count", usage));

View File

@ -31,6 +31,7 @@ import org.elasticsearch.xpack.core.ml.action.EvaluateDataFrameAction;
import org.elasticsearch.xpack.core.ml.action.ExplainDataFrameAnalyticsAction; import org.elasticsearch.xpack.core.ml.action.ExplainDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsAction; import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction; import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction;
import org.elasticsearch.xpack.core.ml.action.NodeAcknowledgedResponse;
import org.elasticsearch.xpack.core.ml.action.PutDataFrameAnalyticsAction; import org.elasticsearch.xpack.core.ml.action.PutDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction; import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.action.StopDataFrameAnalyticsAction; import org.elasticsearch.xpack.core.ml.action.StopDataFrameAnalyticsAction;
@ -125,7 +126,7 @@ abstract class MlNativeDataFrameAnalyticsIntegTestCase extends MlNativeIntegTest
return client().execute(DeleteDataFrameAnalyticsAction.INSTANCE, request).actionGet(); return client().execute(DeleteDataFrameAnalyticsAction.INSTANCE, request).actionGet();
} }
protected AcknowledgedResponse startAnalytics(String id) { protected NodeAcknowledgedResponse startAnalytics(String id) {
StartDataFrameAnalyticsAction.Request request = new StartDataFrameAnalyticsAction.Request(id); StartDataFrameAnalyticsAction.Request request = new StartDataFrameAnalyticsAction.Request(id);
return client().execute(StartDataFrameAnalyticsAction.INSTANCE, request).actionGet(); return client().execute(StartDataFrameAnalyticsAction.INSTANCE, request).actionGet();
} }

View File

@ -16,6 +16,7 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction; import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction;
import org.elasticsearch.xpack.core.ml.action.NodeAcknowledgedResponse;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
import org.elasticsearch.xpack.core.ml.dataframe.analyses.BoostedTreeParams; import org.elasticsearch.xpack.core.ml.dataframe.analyses.BoostedTreeParams;
@ -29,11 +30,13 @@ import java.util.Set;
import static org.elasticsearch.test.hamcrest.OptionalMatchers.isPresent; import static org.elasticsearch.test.hamcrest.OptionalMatchers.isPresent;
import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.emptyString;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.not;
public class RegressionIT extends MlNativeDataFrameAnalyticsIntegTestCase { public class RegressionIT extends MlNativeDataFrameAnalyticsIntegTestCase {
@ -246,7 +249,8 @@ public class RegressionIT extends MlNativeDataFrameAnalyticsIntegTestCase {
assertIsStopped(jobId); assertIsStopped(jobId);
assertProgress(jobId, 0, 0, 0, 0); assertProgress(jobId, 0, 0, 0, 0);
startAnalytics(jobId); NodeAcknowledgedResponse response = startAnalytics(jobId);
assertThat(response.getNode(), not(emptyString()));
// Wait until state is one of REINDEXING or ANALYZING, or until it is STOPPED. // Wait until state is one of REINDEXING or ANALYZING, or until it is STOPPED.
assertBusy(() -> { assertBusy(() -> {
@ -263,7 +267,8 @@ public class RegressionIT extends MlNativeDataFrameAnalyticsIntegTestCase {
// Now let's start it again // Now let's start it again
try { try {
startAnalytics(jobId); response = startAnalytics(jobId);
assertThat(response.getNode(), not(emptyString()));
} catch (Exception e) { } catch (Exception e) {
if (e.getMessage().equals("Cannot start because the job has already finished")) { if (e.getMessage().equals("Cannot start because the job has already finished")) {
// That means the job had managed to complete // That means the job had managed to complete

View File

@ -23,6 +23,7 @@ import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction; import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction;
import org.elasticsearch.xpack.core.ml.action.NodeAcknowledgedResponse;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsDest; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsDest;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsSource; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsSource;
@ -36,12 +37,14 @@ import java.util.Map;
import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.emptyString;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.startsWith; import static org.hamcrest.Matchers.startsWith;
@ -571,7 +574,8 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest
assertIsStopped(id); assertIsStopped(id);
// Due to lazy start being allowed, this should succeed even though no node currently in the cluster is big enough // Due to lazy start being allowed, this should succeed even though no node currently in the cluster is big enough
startAnalytics(id); NodeAcknowledgedResponse response = startAnalytics(id);
assertThat(response.getNode(), emptyString());
// Wait until state is STARTING, there is no node but there is an assignment explanation. // Wait until state is STARTING, there is no node but there is an assignment explanation.
assertBusy(() -> { assertBusy(() -> {
@ -620,7 +624,8 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest
putAnalytics(config); putAnalytics(config);
assertIsStopped(id); assertIsStopped(id);
startAnalytics(id); NodeAcknowledgedResponse response = startAnalytics(id);
assertThat(response.getNode(), not(emptyString()));
// Wait until state is one of REINDEXING or ANALYZING, or until it is STOPPED. // Wait until state is one of REINDEXING or ANALYZING, or until it is STOPPED.
assertBusy(() -> { assertBusy(() -> {
@ -633,7 +638,8 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest
// Now let's start it again // Now let's start it again
try { try {
startAnalytics(id); response = startAnalytics(id);
assertThat(response.getNode(), not(emptyString()));
} catch (Exception e) { } catch (Exception e) {
if (e.getMessage().equals("Cannot start because the job has already finished")) { if (e.getMessage().equals("Cannot start because the job has already finished")) {
// That means the job had managed to complete // That means the job had managed to complete

View File

@ -14,7 +14,6 @@ import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
@ -46,6 +45,7 @@ import org.elasticsearch.xpack.core.ml.MlMetaIndex;
import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.FinalizeJobExecutionAction; import org.elasticsearch.xpack.core.ml.action.FinalizeJobExecutionAction;
import org.elasticsearch.xpack.core.ml.action.NodeAcknowledgedResponse;
import org.elasticsearch.xpack.core.ml.action.OpenJobAction; import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.core.ml.job.config.JobState;
@ -81,7 +81,7 @@ import static org.elasticsearch.xpack.ml.MachineLearning.MAX_OPEN_JOBS_PER_NODE;
In case of instability persistent tasks checks may fail and that is ok, in that case all bets are off. In case of instability persistent tasks checks may fail and that is ok, in that case all bets are off.
The open job api is a low through put api, so the fact that we redirect to elected master node shouldn't be an issue. The open job api is a low through put api, so the fact that we redirect to elected master node shouldn't be an issue.
*/ */
public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAction.Request, AcknowledgedResponse> { public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAction.Request, NodeAcknowledgedResponse> {
private static final Logger logger = LogManager.getLogger(TransportOpenJobAction.class); private static final Logger logger = LogManager.getLogger(TransportOpenJobAction.class);
@ -193,8 +193,8 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
} }
@Override @Override
protected AcknowledgedResponse read(StreamInput in) throws IOException { protected NodeAcknowledgedResponse read(StreamInput in) throws IOException {
return new AcknowledgedResponse(in); return new NodeAcknowledgedResponse(in);
} }
@Override @Override
@ -206,7 +206,7 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
} }
@Override @Override
protected void masterOperation(OpenJobAction.Request request, ClusterState state, ActionListener<AcknowledgedResponse> listener) { protected void masterOperation(OpenJobAction.Request request, ClusterState state, ActionListener<NodeAcknowledgedResponse> listener) {
if (migrationEligibilityCheck.jobIsEligibleForMigration(request.getJobParams().getJobId(), state)) { if (migrationEligibilityCheck.jobIsEligibleForMigration(request.getJobParams().getJobId(), state)) {
listener.onFailure(ExceptionsHelper.configHasNotBeenMigrated("open job", request.getJobParams().getJobId())); listener.onFailure(ExceptionsHelper.configHasNotBeenMigrated("open job", request.getJobParams().getJobId()));
return; return;
@ -216,10 +216,10 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
if (licenseState.isMachineLearningAllowed()) { if (licenseState.isMachineLearningAllowed()) {
// Clear job finished time once the job is started and respond // Clear job finished time once the job is started and respond
ActionListener<AcknowledgedResponse> clearJobFinishTime = ActionListener.wrap( ActionListener<NodeAcknowledgedResponse> clearJobFinishTime = ActionListener.wrap(
response -> { response -> {
if (response.isAcknowledged()) { if (response.isAcknowledged()) {
clearJobFinishedTime(jobParams.getJobId(), listener); clearJobFinishedTime(response, jobParams.getJobId(), listener);
} else { } else {
listener.onResponse(response); listener.onResponse(response);
} }
@ -272,7 +272,7 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
} }
} }
private void waitForJobStarted(String taskId, OpenJobAction.JobParams jobParams, ActionListener<AcknowledgedResponse> listener) { private void waitForJobStarted(String taskId, OpenJobAction.JobParams jobParams, ActionListener<NodeAcknowledgedResponse> listener) {
JobPredicate predicate = new JobPredicate(); JobPredicate predicate = new JobPredicate();
persistentTasksService.waitForPersistentTaskCondition(taskId, predicate, jobParams.getTimeout(), persistentTasksService.waitForPersistentTaskCondition(taskId, predicate, jobParams.getTimeout(),
new PersistentTasksService.WaitForPersistentTaskListener<OpenJobAction.JobParams>() { new PersistentTasksService.WaitForPersistentTaskListener<OpenJobAction.JobParams>() {
@ -287,7 +287,7 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
listener.onFailure(predicate.exception); listener.onFailure(predicate.exception);
} }
} else { } else {
listener.onResponse(new AcknowledgedResponse(true)); listener.onResponse(new NodeAcknowledgedResponse(true, predicate.node));
} }
} }
@ -304,21 +304,21 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
}); });
} }
private void clearJobFinishedTime(String jobId, ActionListener<AcknowledgedResponse> listener) { private void clearJobFinishedTime(NodeAcknowledgedResponse response, String jobId, ActionListener<NodeAcknowledgedResponse> listener) {
JobUpdate update = new JobUpdate.Builder(jobId).setClearFinishTime(true).build(); JobUpdate update = new JobUpdate.Builder(jobId).setClearFinishTime(true).build();
jobConfigProvider.updateJob(jobId, update, null, ActionListener.wrap( jobConfigProvider.updateJob(jobId, update, null, ActionListener.wrap(
job -> listener.onResponse(new AcknowledgedResponse(true)), job -> listener.onResponse(response),
e -> { e -> {
logger.error("[" + jobId + "] Failed to clear finished_time", e); logger.error("[" + jobId + "] Failed to clear finished_time", e);
// Not a critical error so continue // Not a critical error so continue
listener.onResponse(new AcknowledgedResponse(true)); listener.onResponse(response);
} }
)); ));
} }
private void cancelJobStart(PersistentTasksCustomMetadata.PersistentTask<OpenJobAction.JobParams> persistentTask, Exception exception, private void cancelJobStart(PersistentTasksCustomMetadata.PersistentTask<OpenJobAction.JobParams> persistentTask, Exception exception,
ActionListener<AcknowledgedResponse> listener) { ActionListener<NodeAcknowledgedResponse> listener) {
persistentTasksService.sendRemoveRequest(persistentTask.getId(), persistentTasksService.sendRemoveRequest(persistentTask.getId(),
new ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>>() { new ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>>() {
@Override @Override
@ -546,6 +546,7 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
private static class JobPredicate implements Predicate<PersistentTasksCustomMetadata.PersistentTask<?>> { private static class JobPredicate implements Predicate<PersistentTasksCustomMetadata.PersistentTask<?>> {
private volatile Exception exception; private volatile Exception exception;
private volatile String node = "";
private volatile boolean shouldCancel; private volatile boolean shouldCancel;
@Override @Override
@ -591,6 +592,7 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
case CLOSED: case CLOSED:
return false; return false;
case OPENED: case OPENED:
node = persistentTask.getExecutorNode();
return true; return true;
case CLOSING: case CLOSING:
exception = ExceptionsHelper.conflictStatusException("The job has been " + JobState.CLOSED + " while waiting to be " exception = ExceptionsHelper.conflictStatusException("The job has been " + JobState.CLOSED + " while waiting to be "

View File

@ -16,7 +16,6 @@ import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
@ -54,6 +53,7 @@ import org.elasticsearch.xpack.core.ml.MlStatsIndex;
import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.ExplainDataFrameAnalyticsAction; import org.elasticsearch.xpack.core.ml.action.ExplainDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction; import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction;
import org.elasticsearch.xpack.core.ml.action.NodeAcknowledgedResponse;
import org.elasticsearch.xpack.core.ml.action.PutDataFrameAnalyticsAction; import org.elasticsearch.xpack.core.ml.action.PutDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction; import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
@ -92,7 +92,7 @@ import static org.elasticsearch.xpack.ml.MachineLearning.MAX_OPEN_JOBS_PER_NODE;
* Starts the persistent task for running data frame analytics. * Starts the persistent task for running data frame analytics.
*/ */
public class TransportStartDataFrameAnalyticsAction public class TransportStartDataFrameAnalyticsAction
extends TransportMasterNodeAction<StartDataFrameAnalyticsAction.Request, AcknowledgedResponse> { extends TransportMasterNodeAction<StartDataFrameAnalyticsAction.Request, NodeAcknowledgedResponse> {
private static final Logger logger = LogManager.getLogger(TransportStartDataFrameAnalyticsAction.class); private static final Logger logger = LogManager.getLogger(TransportStartDataFrameAnalyticsAction.class);
private static final String PRIMARY_SHARDS_INACTIVE = "not all primary shards are active"; private static final String PRIMARY_SHARDS_INACTIVE = "not all primary shards are active";
@ -138,8 +138,8 @@ public class TransportStartDataFrameAnalyticsAction
} }
@Override @Override
protected AcknowledgedResponse read(StreamInput in) throws IOException { protected NodeAcknowledgedResponse read(StreamInput in) throws IOException {
return new AcknowledgedResponse(in); return new NodeAcknowledgedResponse(in);
} }
@Override @Override
@ -152,7 +152,7 @@ public class TransportStartDataFrameAnalyticsAction
@Override @Override
protected void masterOperation(StartDataFrameAnalyticsAction.Request request, ClusterState state, protected void masterOperation(StartDataFrameAnalyticsAction.Request request, ClusterState state,
ActionListener<AcknowledgedResponse> listener) { ActionListener<NodeAcknowledgedResponse> listener) {
if (licenseState.isMachineLearningAllowed() == false) { if (licenseState.isMachineLearningAllowed() == false) {
listener.onFailure(LicenseUtils.newComplianceException(XPackField.MACHINE_LEARNING)); listener.onFailure(LicenseUtils.newComplianceException(XPackField.MACHINE_LEARNING));
return; return;
@ -385,7 +385,7 @@ public class TransportStartDataFrameAnalyticsAction
} }
private void waitForAnalyticsStarted(PersistentTasksCustomMetadata.PersistentTask<StartDataFrameAnalyticsAction.TaskParams> task, private void waitForAnalyticsStarted(PersistentTasksCustomMetadata.PersistentTask<StartDataFrameAnalyticsAction.TaskParams> task,
TimeValue timeout, ActionListener<AcknowledgedResponse> listener) { TimeValue timeout, ActionListener<NodeAcknowledgedResponse> listener) {
AnalyticsPredicate predicate = new AnalyticsPredicate(); AnalyticsPredicate predicate = new AnalyticsPredicate();
persistentTasksService.waitForPersistentTaskCondition(task.getId(), predicate, timeout, persistentTasksService.waitForPersistentTaskCondition(task.getId(), predicate, timeout,
@ -399,7 +399,7 @@ public class TransportStartDataFrameAnalyticsAction
cancelAnalyticsStart(task, predicate.exception, listener); cancelAnalyticsStart(task, predicate.exception, listener);
} else { } else {
auditor.info(task.getParams().getId(), Messages.DATA_FRAME_ANALYTICS_AUDIT_STARTED); auditor.info(task.getParams().getId(), Messages.DATA_FRAME_ANALYTICS_AUDIT_STARTED);
listener.onResponse(new AcknowledgedResponse(true)); listener.onResponse(new NodeAcknowledgedResponse(true, predicate.node));
} }
} }
@ -454,6 +454,7 @@ public class TransportStartDataFrameAnalyticsAction
private static class AnalyticsPredicate implements Predicate<PersistentTasksCustomMetadata.PersistentTask<?>> { private static class AnalyticsPredicate implements Predicate<PersistentTasksCustomMetadata.PersistentTask<?>> {
private volatile Exception exception; private volatile Exception exception;
private volatile String node = "";
private volatile String assignmentExplanation; private volatile String assignmentExplanation;
@Override @Override
@ -488,6 +489,7 @@ public class TransportStartDataFrameAnalyticsAction
case STARTED: case STARTED:
case REINDEXING: case REINDEXING:
case ANALYZING: case ANALYZING:
node = persistentTask.getExecutorNode();
return true; return true;
case STOPPING: case STOPPING:
exception = ExceptionsHelper.conflictStatusException("the task has been stopped while waiting to be started"); exception = ExceptionsHelper.conflictStatusException("the task has been stopped while waiting to be started");
@ -510,7 +512,7 @@ public class TransportStartDataFrameAnalyticsAction
private void cancelAnalyticsStart( private void cancelAnalyticsStart(
PersistentTasksCustomMetadata.PersistentTask<StartDataFrameAnalyticsAction.TaskParams> persistentTask, Exception exception, PersistentTasksCustomMetadata.PersistentTask<StartDataFrameAnalyticsAction.TaskParams> persistentTask, Exception exception,
ActionListener<AcknowledgedResponse> listener) { ActionListener<NodeAcknowledgedResponse> listener) {
persistentTasksService.sendRemoveRequest(persistentTask.getId(), persistentTasksService.sendRemoveRequest(persistentTask.getId(),
new ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>>() { new ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>>() {
@Override @Override

View File

@ -12,7 +12,6 @@ import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
@ -42,6 +41,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.XPackField; import org.elasticsearch.xpack.core.XPackField;
import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.NodeAcknowledgedResponse;
import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction; import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedJobValidator; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedJobValidator;
@ -78,7 +78,7 @@ import java.util.function.Predicate;
In case of instability persistent tasks checks may fail and that is ok, in that case all bets are off. In case of instability persistent tasks checks may fail and that is ok, in that case all bets are off.
The start datafeed api is a low through put api, so the fact that we redirect to elected master node shouldn't be an issue. The start datafeed api is a low through put api, so the fact that we redirect to elected master node shouldn't be an issue.
*/ */
public class TransportStartDatafeedAction extends TransportMasterNodeAction<StartDatafeedAction.Request, AcknowledgedResponse> { public class TransportStartDatafeedAction extends TransportMasterNodeAction<StartDatafeedAction.Request, NodeAcknowledgedResponse> {
private static final Logger logger = LogManager.getLogger(TransportStartDatafeedAction.class); private static final Logger logger = LogManager.getLogger(TransportStartDatafeedAction.class);
@ -147,13 +147,13 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction<Star
} }
@Override @Override
protected AcknowledgedResponse read(StreamInput in) throws IOException { protected NodeAcknowledgedResponse read(StreamInput in) throws IOException {
return new AcknowledgedResponse(in); return new NodeAcknowledgedResponse(in);
} }
@Override @Override
protected void masterOperation(StartDatafeedAction.Request request, ClusterState state, protected void masterOperation(StartDatafeedAction.Request request, ClusterState state,
ActionListener<AcknowledgedResponse> listener) { ActionListener<NodeAcknowledgedResponse> listener) {
StartDatafeedAction.DatafeedParams params = request.getParams(); StartDatafeedAction.DatafeedParams params = request.getParams();
if (licenseState.isMachineLearningAllowed() == false) { if (licenseState.isMachineLearningAllowed() == false) {
listener.onFailure(LicenseUtils.newComplianceException(XPackField.MACHINE_LEARNING)); listener.onFailure(LicenseUtils.newComplianceException(XPackField.MACHINE_LEARNING));
@ -282,7 +282,7 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction<Star
} }
private void waitForDatafeedStarted(String taskId, StartDatafeedAction.DatafeedParams params, private void waitForDatafeedStarted(String taskId, StartDatafeedAction.DatafeedParams params,
ActionListener<AcknowledgedResponse> listener) { ActionListener<NodeAcknowledgedResponse> listener) {
DatafeedPredicate predicate = new DatafeedPredicate(); DatafeedPredicate predicate = new DatafeedPredicate();
persistentTasksService.waitForPersistentTaskCondition(taskId, predicate, params.getTimeout(), persistentTasksService.waitForPersistentTaskCondition(taskId, predicate, params.getTimeout(),
new PersistentTasksService.WaitForPersistentTaskListener<StartDatafeedAction.DatafeedParams>() { new PersistentTasksService.WaitForPersistentTaskListener<StartDatafeedAction.DatafeedParams>() {
@ -294,7 +294,7 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction<Star
// what would have happened if the error had been detected in the "fast fail" validation // what would have happened if the error had been detected in the "fast fail" validation
cancelDatafeedStart(persistentTask, predicate.exception, listener); cancelDatafeedStart(persistentTask, predicate.exception, listener);
} else { } else {
listener.onResponse(new AcknowledgedResponse(true)); listener.onResponse(new NodeAcknowledgedResponse(true, predicate.node));
} }
} }
@ -312,7 +312,7 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction<Star
} }
private void cancelDatafeedStart(PersistentTasksCustomMetadata.PersistentTask<StartDatafeedAction.DatafeedParams> persistentTask, private void cancelDatafeedStart(PersistentTasksCustomMetadata.PersistentTask<StartDatafeedAction.DatafeedParams> persistentTask,
Exception exception, ActionListener<AcknowledgedResponse> listener) { Exception exception, ActionListener<NodeAcknowledgedResponse> listener) {
persistentTasksService.sendRemoveRequest(persistentTask.getId(), persistentTasksService.sendRemoveRequest(persistentTask.getId(),
new ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>>() { new ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>>() {
@Override @Override
@ -493,6 +493,7 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction<Star
private static class DatafeedPredicate implements Predicate<PersistentTasksCustomMetadata.PersistentTask<?>> { private static class DatafeedPredicate implements Predicate<PersistentTasksCustomMetadata.PersistentTask<?>> {
private volatile Exception exception; private volatile Exception exception;
private volatile String node = "";
@Override @Override
public boolean test(PersistentTasksCustomMetadata.PersistentTask<?> persistentTask) { public boolean test(PersistentTasksCustomMetadata.PersistentTask<?> persistentTask) {
@ -513,7 +514,11 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction<Star
} }
} }
DatafeedState datafeedState = (DatafeedState) persistentTask.getState(); DatafeedState datafeedState = (DatafeedState) persistentTask.getState();
return datafeedState == DatafeedState.STARTED; if (datafeedState == DatafeedState.STARTED) {
node = persistentTask.getExecutorNode();
return true;
}
return false;
} }
} }
} }

View File

@ -5,7 +5,6 @@
*/ */
package org.elasticsearch.xpack.ml.rest.datafeeds; package org.elasticsearch.xpack.ml.rest.datafeeds;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
@ -16,6 +15,7 @@ import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestResponse; import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.RestBuilderListener; import org.elasticsearch.rest.action.RestBuilderListener;
import org.elasticsearch.xpack.core.ml.action.NodeAcknowledgedResponse;
import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction; import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.MachineLearning;
@ -71,12 +71,14 @@ public class RestStartDatafeedAction extends BaseRestHandler {
} }
return channel -> { return channel -> {
client.execute(StartDatafeedAction.INSTANCE, jobDatafeedRequest, client.execute(StartDatafeedAction.INSTANCE, jobDatafeedRequest,
new RestBuilderListener<AcknowledgedResponse>(channel) { new RestBuilderListener<NodeAcknowledgedResponse>(channel) {
@Override @Override
public RestResponse buildResponse(AcknowledgedResponse r, XContentBuilder builder) throws Exception { public RestResponse buildResponse(NodeAcknowledgedResponse r, XContentBuilder builder) throws Exception {
// This doesn't use the toXContent of the response object because we rename "acknowledged" to "started"
builder.startObject(); builder.startObject();
builder.field("started", r.isAcknowledged()); builder.field("started", r.isAcknowledged());
builder.field(NodeAcknowledgedResponse.NODE_FIELD, r.getNode());
builder.endObject(); builder.endObject();
return new BytesRestResponse(RestStatus.OK, builder); return new BytesRestResponse(RestStatus.OK, builder);
} }

View File

@ -5,7 +5,6 @@
*/ */
package org.elasticsearch.xpack.ml.rest.job; package org.elasticsearch.xpack.ml.rest.job;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
@ -15,6 +14,7 @@ import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestResponse; import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.RestBuilderListener; import org.elasticsearch.rest.action.RestBuilderListener;
import org.elasticsearch.xpack.core.ml.action.NodeAcknowledgedResponse;
import org.elasticsearch.xpack.core.ml.action.OpenJobAction; import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.MachineLearning;
@ -61,11 +61,13 @@ public class RestOpenJobAction extends BaseRestHandler {
request = new OpenJobAction.Request(jobParams); request = new OpenJobAction.Request(jobParams);
} }
return channel -> { return channel -> {
client.execute(OpenJobAction.INSTANCE, request, new RestBuilderListener<AcknowledgedResponse>(channel) { client.execute(OpenJobAction.INSTANCE, request, new RestBuilderListener<NodeAcknowledgedResponse>(channel) {
@Override @Override
public RestResponse buildResponse(AcknowledgedResponse r, XContentBuilder builder) throws Exception { public RestResponse buildResponse(NodeAcknowledgedResponse r, XContentBuilder builder) throws Exception {
// This doesn't use the toXContent of the response object because we rename "acknowledged" to "opened"
builder.startObject(); builder.startObject();
builder.field("opened", r.isAcknowledged()); builder.field("opened", r.isAcknowledged());
builder.field(NodeAcknowledgedResponse.NODE_FIELD, r.getNode());
builder.endObject(); builder.endObject();
return new BytesRestResponse(RestStatus.OK, builder); return new BytesRestResponse(RestStatus.OK, builder);
} }

View File

@ -33,6 +33,7 @@ import org.elasticsearch.xpack.core.ml.action.DeleteJobAction;
import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction; import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction;
import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction; import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction;
import org.elasticsearch.xpack.core.ml.action.InternalInferModelAction; import org.elasticsearch.xpack.core.ml.action.InternalInferModelAction;
import org.elasticsearch.xpack.core.ml.action.NodeAcknowledgedResponse;
import org.elasticsearch.xpack.core.ml.action.OpenJobAction; import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
import org.elasticsearch.xpack.core.ml.action.PutDatafeedAction; import org.elasticsearch.xpack.core.ml.action.PutDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.PutJobAction; import org.elasticsearch.xpack.core.ml.action.PutJobAction;
@ -131,7 +132,7 @@ public class MachineLearningLicensingTests extends BaseMlIntegTestCase {
// test that license restricted apis do not work // test that license restricted apis do not work
try (TransportClient client = new TestXPackTransportClient(settings, LocalStateMachineLearning.class)) { try (TransportClient client = new TestXPackTransportClient(settings, LocalStateMachineLearning.class)) {
client.addTransportAddress(internalCluster().getDataNodeInstance(Transport.class).boundAddress().publishAddress()); client.addTransportAddress(internalCluster().getDataNodeInstance(Transport.class).boundAddress().publishAddress());
PlainActionFuture<AcknowledgedResponse> listener = PlainActionFuture.newFuture(); PlainActionFuture<NodeAcknowledgedResponse> listener = PlainActionFuture.newFuture();
new MachineLearningClient(client).openJob(new OpenJobAction.Request(jobId), listener); new MachineLearningClient(client).openJob(new OpenJobAction.Request(jobId), listener);
listener.actionGet(); listener.actionGet();
fail("open job action should not be enabled!"); fail("open job action should not be enabled!");
@ -155,7 +156,7 @@ public class MachineLearningLicensingTests extends BaseMlIntegTestCase {
// test that license restricted apis do now work // test that license restricted apis do now work
try (TransportClient client = new TestXPackTransportClient(settings, LocalStateMachineLearning.class)) { try (TransportClient client = new TestXPackTransportClient(settings, LocalStateMachineLearning.class)) {
client.addTransportAddress(internalCluster().getDataNodeInstance(Transport.class).boundAddress().publishAddress()); client.addTransportAddress(internalCluster().getDataNodeInstance(Transport.class).boundAddress().publishAddress());
PlainActionFuture<AcknowledgedResponse> listener = PlainActionFuture.newFuture(); PlainActionFuture<NodeAcknowledgedResponse> listener = PlainActionFuture.newFuture();
new MachineLearningClient(client).openJob(new OpenJobAction.Request(jobId), listener); new MachineLearningClient(client).openJob(new OpenJobAction.Request(jobId), listener);
AcknowledgedResponse response = listener.actionGet(); AcknowledgedResponse response = listener.actionGet();
assertNotNull(response); assertNotNull(response);
@ -232,12 +233,12 @@ public class MachineLearningLicensingTests extends BaseMlIntegTestCase {
PutDatafeedAction.Response putDatafeedResponse = putDatafeedListener.actionGet(); PutDatafeedAction.Response putDatafeedResponse = putDatafeedListener.actionGet();
assertNotNull(putDatafeedResponse); assertNotNull(putDatafeedResponse);
// open job // open job
PlainActionFuture<AcknowledgedResponse> openJobListener = PlainActionFuture.newFuture(); PlainActionFuture<NodeAcknowledgedResponse> openJobListener = PlainActionFuture.newFuture();
new MachineLearningClient(client).openJob(new OpenJobAction.Request(jobId), openJobListener); new MachineLearningClient(client).openJob(new OpenJobAction.Request(jobId), openJobListener);
AcknowledgedResponse openJobResponse = openJobListener.actionGet(); AcknowledgedResponse openJobResponse = openJobListener.actionGet();
assertNotNull(openJobResponse); assertNotNull(openJobResponse);
// start datafeed // start datafeed
PlainActionFuture<AcknowledgedResponse> listener = PlainActionFuture.newFuture(); PlainActionFuture<NodeAcknowledgedResponse> listener = PlainActionFuture.newFuture();
new MachineLearningClient(client).startDatafeed(new StartDatafeedAction.Request(datafeedId, 0L), listener); new MachineLearningClient(client).startDatafeed(new StartDatafeedAction.Request(datafeedId, 0L), listener);
listener.actionGet(); listener.actionGet();
} }
@ -270,12 +271,12 @@ public class MachineLearningLicensingTests extends BaseMlIntegTestCase {
try (TransportClient client = new TestXPackTransportClient(settings, LocalStateMachineLearning.class)) { try (TransportClient client = new TestXPackTransportClient(settings, LocalStateMachineLearning.class)) {
client.addTransportAddress(internalCluster().getDataNodeInstance(Transport.class).boundAddress().publishAddress()); client.addTransportAddress(internalCluster().getDataNodeInstance(Transport.class).boundAddress().publishAddress());
// open job // open job
PlainActionFuture<AcknowledgedResponse> openJobListener = PlainActionFuture.newFuture(); PlainActionFuture<NodeAcknowledgedResponse> openJobListener = PlainActionFuture.newFuture();
new MachineLearningClient(client).openJob(new OpenJobAction.Request(jobId), openJobListener); new MachineLearningClient(client).openJob(new OpenJobAction.Request(jobId), openJobListener);
AcknowledgedResponse openJobResponse = openJobListener.actionGet(); AcknowledgedResponse openJobResponse = openJobListener.actionGet();
assertNotNull(openJobResponse); assertNotNull(openJobResponse);
// start datafeed // start datafeed
PlainActionFuture<AcknowledgedResponse> listener = PlainActionFuture.newFuture(); PlainActionFuture<NodeAcknowledgedResponse> listener = PlainActionFuture.newFuture();
new MachineLearningClient(client).startDatafeed(new StartDatafeedAction.Request(datafeedId, 0L), listener); new MachineLearningClient(client).startDatafeed(new StartDatafeedAction.Request(datafeedId, 0L), listener);
listener.actionGet(); listener.actionGet();
} }
@ -334,7 +335,7 @@ public class MachineLearningLicensingTests extends BaseMlIntegTestCase {
Collections.singletonList(datafeedIndex))), putDatafeedListener); Collections.singletonList(datafeedIndex))), putDatafeedListener);
PutDatafeedAction.Response putDatafeedResponse = putDatafeedListener.actionGet(); PutDatafeedAction.Response putDatafeedResponse = putDatafeedListener.actionGet();
assertNotNull(putDatafeedResponse); assertNotNull(putDatafeedResponse);
PlainActionFuture<AcknowledgedResponse> openJobListener = PlainActionFuture.newFuture(); PlainActionFuture<NodeAcknowledgedResponse> openJobListener = PlainActionFuture.newFuture();
new MachineLearningClient(client).openJob(new OpenJobAction.Request(jobId), openJobListener); new MachineLearningClient(client).openJob(new OpenJobAction.Request(jobId), openJobListener);
AcknowledgedResponse openJobResponse = openJobListener.actionGet(); AcknowledgedResponse openJobResponse = openJobListener.actionGet();
assertNotNull(openJobResponse); assertNotNull(openJobResponse);
@ -357,7 +358,7 @@ public class MachineLearningLicensingTests extends BaseMlIntegTestCase {
// test that license restricted apis do not work // test that license restricted apis do not work
try (TransportClient client = new TestXPackTransportClient(settings, LocalStateMachineLearning.class)) { try (TransportClient client = new TestXPackTransportClient(settings, LocalStateMachineLearning.class)) {
client.addTransportAddress(internalCluster().getDataNodeInstance(Transport.class).boundAddress().publishAddress()); client.addTransportAddress(internalCluster().getDataNodeInstance(Transport.class).boundAddress().publishAddress());
PlainActionFuture<AcknowledgedResponse> listener = PlainActionFuture.newFuture(); PlainActionFuture<NodeAcknowledgedResponse> listener = PlainActionFuture.newFuture();
new MachineLearningClient(client).startDatafeed(new StartDatafeedAction.Request(datafeedId, 0L), listener); new MachineLearningClient(client).startDatafeed(new StartDatafeedAction.Request(datafeedId, 0L), listener);
listener.actionGet(); listener.actionGet();
fail("start datafeed action should not be enabled!"); fail("start datafeed action should not be enabled!");
@ -375,12 +376,12 @@ public class MachineLearningLicensingTests extends BaseMlIntegTestCase {
try (TransportClient client = new TestXPackTransportClient(settings, LocalStateMachineLearning.class)) { try (TransportClient client = new TestXPackTransportClient(settings, LocalStateMachineLearning.class)) {
client.addTransportAddress(internalCluster().getDataNodeInstance(Transport.class).boundAddress().publishAddress()); client.addTransportAddress(internalCluster().getDataNodeInstance(Transport.class).boundAddress().publishAddress());
// re-open job now that the license is valid again // re-open job now that the license is valid again
PlainActionFuture<AcknowledgedResponse> openJobListener = PlainActionFuture.newFuture(); PlainActionFuture<NodeAcknowledgedResponse> openJobListener = PlainActionFuture.newFuture();
new MachineLearningClient(client).openJob(new OpenJobAction.Request(jobId), openJobListener); new MachineLearningClient(client).openJob(new OpenJobAction.Request(jobId), openJobListener);
AcknowledgedResponse openJobResponse = openJobListener.actionGet(); AcknowledgedResponse openJobResponse = openJobListener.actionGet();
assertNotNull(openJobResponse); assertNotNull(openJobResponse);
PlainActionFuture<AcknowledgedResponse> listener = PlainActionFuture.newFuture(); PlainActionFuture<NodeAcknowledgedResponse> listener = PlainActionFuture.newFuture();
new MachineLearningClient(client).startDatafeed(new StartDatafeedAction.Request(datafeedId, 0L), listener); new MachineLearningClient(client).startDatafeed(new StartDatafeedAction.Request(datafeedId, 0L), listener);
AcknowledgedResponse response = listener.actionGet(); AcknowledgedResponse response = listener.actionGet();
assertNotNull(response); assertNotNull(response);
@ -408,11 +409,11 @@ public class MachineLearningLicensingTests extends BaseMlIntegTestCase {
Collections.singletonList(datafeedIndex))), putDatafeedListener); Collections.singletonList(datafeedIndex))), putDatafeedListener);
PutDatafeedAction.Response putDatafeedResponse = putDatafeedListener.actionGet(); PutDatafeedAction.Response putDatafeedResponse = putDatafeedListener.actionGet();
assertNotNull(putDatafeedResponse); assertNotNull(putDatafeedResponse);
PlainActionFuture<AcknowledgedResponse> openJobListener = PlainActionFuture.newFuture(); PlainActionFuture<NodeAcknowledgedResponse> openJobListener = PlainActionFuture.newFuture();
new MachineLearningClient(client).openJob(new OpenJobAction.Request(jobId), openJobListener); new MachineLearningClient(client).openJob(new OpenJobAction.Request(jobId), openJobListener);
AcknowledgedResponse openJobResponse = openJobListener.actionGet(); AcknowledgedResponse openJobResponse = openJobListener.actionGet();
assertNotNull(openJobResponse); assertNotNull(openJobResponse);
PlainActionFuture<AcknowledgedResponse> startDatafeedListener = PlainActionFuture.newFuture(); PlainActionFuture<NodeAcknowledgedResponse> startDatafeedListener = PlainActionFuture.newFuture();
new MachineLearningClient(client).startDatafeed( new MachineLearningClient(client).startDatafeed(
new StartDatafeedAction.Request(datafeedId, 0L), startDatafeedListener); new StartDatafeedAction.Request(datafeedId, 0L), startDatafeedListener);
AcknowledgedResponse startDatafeedResponse = startDatafeedListener.actionGet(); AcknowledgedResponse startDatafeedResponse = startDatafeedListener.actionGet();
@ -464,7 +465,7 @@ public class MachineLearningLicensingTests extends BaseMlIntegTestCase {
new MachineLearningClient(client).putJob(new PutJobAction.Request(createJob(jobId)), putJobListener); new MachineLearningClient(client).putJob(new PutJobAction.Request(createJob(jobId)), putJobListener);
PutJobAction.Response putJobResponse = putJobListener.actionGet(); PutJobAction.Response putJobResponse = putJobListener.actionGet();
assertNotNull(putJobResponse); assertNotNull(putJobResponse);
PlainActionFuture<AcknowledgedResponse> openJobListener = PlainActionFuture.newFuture(); PlainActionFuture<NodeAcknowledgedResponse> openJobListener = PlainActionFuture.newFuture();
new MachineLearningClient(client).openJob(new OpenJobAction.Request(jobId), openJobListener); new MachineLearningClient(client).openJob(new OpenJobAction.Request(jobId), openJobListener);
AcknowledgedResponse openJobResponse = openJobListener.actionGet(); AcknowledgedResponse openJobResponse = openJobListener.actionGet();
assertNotNull(openJobResponse); assertNotNull(openJobResponse);

View File

@ -143,6 +143,7 @@
ml.open_job: ml.open_job:
job_id: job-model-memory-limit-as-string job_id: job-model-memory-limit-as-string
- match: { opened: true } - match: { opened: true }
- match: { node: "" }
- do: - do:
headers: headers:
@ -563,6 +564,7 @@
ml.open_job: ml.open_job:
job_id: delete-opened-job job_id: delete-opened-job
- match: { opened: true } - match: { opened: true }
- match: { node: /\S+/ }
- do: - do:
catch: /Cannot delete job \[delete-opened-job\] because the job is opened/ catch: /Cannot delete job \[delete-opened-job\] because the job is opened/
@ -1446,6 +1448,7 @@
ml.open_job: ml.open_job:
job_id: persistent-task-allocation-allowed-test job_id: persistent-task-allocation-allowed-test
- match: { opened: true } - match: { opened: true }
- match: { node: /\S+/ }
--- ---
"Test reopen job resets the finished time": "Test reopen job resets the finished time":

View File

@ -365,14 +365,17 @@ setup:
ml.start_datafeed: ml.start_datafeed:
datafeed_id: "start-stop-datafeed-job-foo-1-feed" datafeed_id: "start-stop-datafeed-job-foo-1-feed"
- match: { started: true } - match: { started: true }
- match: { node: /\S+/ }
- do: - do:
ml.start_datafeed: ml.start_datafeed:
datafeed_id: "start-stop-datafeed-job-foo-2-feed" datafeed_id: "start-stop-datafeed-job-foo-2-feed"
- match: { started: true } - match: { started: true }
- match: { node: /\S+/ }
- do: - do:
ml.start_datafeed: ml.start_datafeed:
datafeed_id: "start-stop-datafeed-job-bar-1-feed" datafeed_id: "start-stop-datafeed-job-bar-1-feed"
- match: { started: true } - match: { started: true }
- match: { node: /\S+/ }
- do: - do:
ml.stop_datafeed: ml.stop_datafeed: