Remove AcknowledgedRestListener in favour of RestToXContentListener (elastic/x-pack-elasticsearch#3985)

Adapt to AcknowledgedRestListener removal 

Original commit: elastic/x-pack-elasticsearch@74c08fcf02
This commit is contained in:
Luca Cavanna 2018-02-22 09:13:58 +01:00 committed by GitHub
parent 28bbb8b3cd
commit 79bc6d9a53
38 changed files with 177 additions and 210 deletions

View File

@ -100,7 +100,6 @@ When the operation is complete, you receive the following results:
[source,js]
----
{
"acknowledged": true,
"model": {
"job_id": "it_ops_new_kpi",
"timestamp": 1491856080000,

View File

@ -6,19 +6,17 @@
package org.elasticsearch.license;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContent.Params;
import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
public class PutLicenseResponse extends AcknowledgedResponse implements ToXContentFragment {
public class PutLicenseResponse extends AcknowledgedResponse {
private LicensesStatus status;
private Map<String, String[]> acknowledgeMessages;
@ -88,8 +86,7 @@ public class PutLicenseResponse extends AcknowledgedResponse implements ToXConte
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field("acknowledged", isAcknowledged());
protected void addCustomFields(XContentBuilder builder, Params params) throws IOException {
switch (status) {
case VALID:
builder.field("license_status", "valid");
@ -115,19 +112,10 @@ public class PutLicenseResponse extends AcknowledgedResponse implements ToXConte
}
builder.endObject();
}
return builder;
}
@Override
public String toString() {
try {
XContentBuilder builder = XContentFactory.jsonBuilder().prettyPrint();
builder.startObject();
toXContent(builder, EMPTY_PARAMS);
builder.endObject();
return builder.string();
} catch (IOException e) {
return "{ \"error\" : \"" + e.getMessage() + "\"}";
}
return Strings.toString(this, true, true);
}
}

View File

@ -8,7 +8,7 @@ package org.elasticsearch.license;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.AcknowledgedRestListener;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.xpack.core.XPackClient;
import org.elasticsearch.xpack.core.rest.XPackRestHandler;
@ -34,7 +34,6 @@ public class RestDeleteLicenseAction extends XPackRestHandler {
deleteLicenseRequest.masterNodeTimeout(request.paramAsTime("master_timeout", deleteLicenseRequest.masterNodeTimeout()));
return channel -> client.es().admin().cluster().execute(DeleteLicenseAction.INSTANCE, deleteLicenseRequest,
new AcknowledgedRestListener<>(channel));
new RestToXContentListener<>(channel));
}
}

View File

@ -5,20 +5,15 @@
*/
package org.elasticsearch.license;
import java.io.IOException;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.RestBuilderListener;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.xpack.core.XPackClient;
import org.elasticsearch.xpack.core.rest.XPackRestHandler;
import java.io.IOException;
import static org.elasticsearch.rest.RestRequest.Method.POST;
import static org.elasticsearch.rest.RestRequest.Method.PUT;
@ -47,14 +42,6 @@ public class RestPutLicenseAction extends XPackRestHandler {
putLicenseRequest.masterNodeTimeout(request.paramAsTime("master_timeout", putLicenseRequest.masterNodeTimeout()));
return channel -> client.es().admin().cluster().execute(PutLicenseAction.INSTANCE, putLicenseRequest,
new RestBuilderListener<PutLicenseResponse>(channel) {
@Override
public RestResponse buildResponse(PutLicenseResponse response, XContentBuilder builder) throws Exception {
builder.startObject();
response.toXContent(builder, ToXContent.EMPTY_PARAMS);
builder.endObject();
return new BytesRestResponse(RestStatus.OK, builder);
}
});
new RestToXContentListener<>(channel));
}
}

View File

@ -5,11 +5,12 @@
*/
package org.elasticsearch.xpack.core.ml.action;
import org.elasticsearch.Version;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
@ -140,7 +141,7 @@ public class PostCalendarEventsAction extends Action<PostCalendarEventsAction.Re
}
}
public static class Response extends AcknowledgedResponse implements ToXContentObject {
public static class Response extends ActionResponse implements ToXContentObject {
private List<ScheduledEvent> scheduledEvents;
@ -148,21 +149,28 @@ public class PostCalendarEventsAction extends Action<PostCalendarEventsAction.Re
}
public Response(List<ScheduledEvent> scheduledEvents) {
super(true);
this.scheduledEvents = scheduledEvents;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
readAcknowledged(in);
//TODO version needs to be updated once backport to 6.x
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1) == false) {
//the acknowledged flag was removed
in.readBoolean();
}
in.readList(ScheduledEvent::new);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
writeAcknowledged(out);
//TODO version needs to be updated once backport to 6.x
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1) == false) {
//the acknowledged flag is no longer supported
out.writeBoolean(true);
}
out.writeList(scheduledEvents);
}
@ -176,7 +184,7 @@ public class PostCalendarEventsAction extends Action<PostCalendarEventsAction.Re
@Override
public int hashCode() {
return Objects.hash(isAcknowledged(), scheduledEvents);
return Objects.hash(scheduledEvents);
}
@Override
@ -188,7 +196,7 @@ public class PostCalendarEventsAction extends Action<PostCalendarEventsAction.Re
return false;
}
Response other = (Response) obj;
return Objects.equals(isAcknowledged(), other.isAcknowledged()) && Objects.equals(scheduledEvents, other.scheduledEvents);
return Objects.equals(scheduledEvents, other.scheduledEvents);
}
}
}

View File

@ -5,11 +5,12 @@
*/
package org.elasticsearch.xpack.core.ml.action;
import org.elasticsearch.Version;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
@ -137,7 +138,7 @@ public class PutCalendarAction extends Action<PutCalendarAction.Request, PutCale
}
}
public static class Response extends AcknowledgedResponse implements ToXContentObject {
public static class Response extends ActionResponse implements ToXContentObject {
private Calendar calendar;
@ -145,14 +146,17 @@ public class PutCalendarAction extends Action<PutCalendarAction.Request, PutCale
}
public Response(Calendar calendar) {
super(true);
this.calendar = calendar;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
readAcknowledged(in);
//TODO version needs to be updated once backport to 6.x
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1) == false) {
//the acknowledged flag was removed
in.readBoolean();
}
calendar = new Calendar(in);
}
@ -160,7 +164,11 @@ public class PutCalendarAction extends Action<PutCalendarAction.Request, PutCale
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
writeAcknowledged(out);
//TODO version needs to be updated once backport to 6.x
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1) == false) {
//the acknowledged flag is no longer supported
out.writeBoolean(true);
}
calendar.writeTo(out);
}
@ -171,7 +179,7 @@ public class PutCalendarAction extends Action<PutCalendarAction.Request, PutCale
@Override
public int hashCode() {
return Objects.hash(isAcknowledged(), calendar);
return Objects.hash(calendar);
}
@Override
@ -183,7 +191,7 @@ public class PutCalendarAction extends Action<PutCalendarAction.Request, PutCale
return false;
}
Response other = (Response) obj;
return Objects.equals(isAcknowledged(), other.isAcknowledged()) && Objects.equals(calendar, other.calendar);
return Objects.equals(calendar, other.calendar);
}
}
}

View File

@ -5,10 +5,11 @@
*/
package org.elasticsearch.xpack.core.ml.action;
import org.elasticsearch.Version;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.io.stream.StreamInput;
@ -105,12 +106,11 @@ public class PutDatafeedAction extends Action<PutDatafeedAction.Request, PutData
}
}
public static class Response extends AcknowledgedResponse implements ToXContentObject {
public static class Response extends ActionResponse implements ToXContentObject {
private DatafeedConfig datafeed;
public Response(boolean acked, DatafeedConfig datafeed) {
super(acked);
public Response(DatafeedConfig datafeed) {
this.datafeed = datafeed;
}
@ -124,14 +124,22 @@ public class PutDatafeedAction extends Action<PutDatafeedAction.Request, PutData
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
readAcknowledged(in);
//TODO version needs to be updated once backport to 6.x
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1) == false) {
//the acknowledged flag was removed
in.readBoolean();
}
datafeed = new DatafeedConfig(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
writeAcknowledged(out);
//TODO version needs to be updated once backport to 6.x
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1) == false) {
//the acknowledged flag is no longer supported
out.writeBoolean(true);
}
datafeed.writeTo(out);
}

View File

@ -5,10 +5,11 @@
*/
package org.elasticsearch.xpack.core.ml.action;
import org.elasticsearch.Version;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.Strings;
@ -130,12 +131,11 @@ public class PutJobAction extends Action<PutJobAction.Request, PutJobAction.Resp
}
}
public static class Response extends AcknowledgedResponse implements ToXContentObject {
public static class Response extends ActionResponse implements ToXContentObject {
private Job job;
public Response(boolean acked, Job job) {
super(acked);
public Response(Job job) {
this.job = job;
}
@ -149,20 +149,27 @@ public class PutJobAction extends Action<PutJobAction.Request, PutJobAction.Resp
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
readAcknowledged(in);
//TODO version needs to be updated once backport to 6.x
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1) == false) {
//the acknowledged flag was removed
in.readBoolean();
}
job = new Job(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
writeAcknowledged(out);
//TODO version needs to be updated once backport to 6.x
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1) == false) {
//the acknowledged flag is no longer supported
out.writeBoolean(true);
}
job.writeTo(out);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
// Don't serialize acknowledged because current api directly serializes the job details
builder.startObject();
job.doXContentBody(builder, params);
builder.endObject();
@ -182,5 +189,4 @@ public class PutJobAction extends Action<PutJobAction.Request, PutJobAction.Resp
return Objects.hash(job);
}
}
}

View File

@ -5,10 +5,11 @@
*/
package org.elasticsearch.xpack.core.ml.action;
import org.elasticsearch.Version;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.ParseField;
@ -157,9 +158,8 @@ extends Action<RevertModelSnapshotAction.Request, RevertModelSnapshotAction.Resp
}
}
public static class Response extends AcknowledgedResponse implements StatusToXContentObject {
public static class Response extends ActionResponse implements StatusToXContentObject {
private static final ParseField ACKNOWLEDGED = new ParseField("acknowledged");
private static final ParseField MODEL = new ParseField("model");
private ModelSnapshot model;
@ -168,7 +168,6 @@ extends Action<RevertModelSnapshotAction.Request, RevertModelSnapshotAction.Resp
}
public Response(ModelSnapshot modelSnapshot) {
super(true);
model = modelSnapshot;
}
@ -179,14 +178,22 @@ extends Action<RevertModelSnapshotAction.Request, RevertModelSnapshotAction.Resp
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
readAcknowledged(in);
//TODO version needs to be updated once backport to 6.x
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1) == false) {
//the acknowledged flag was removed
in.readBoolean();
}
model = new ModelSnapshot(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
writeAcknowledged(out);
//TODO version needs to be updated once backport to 6.x
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1) == false) {
//the acknowledged flag is no longer supported
out.writeBoolean(true);
}
model.writeTo(out);
}
@ -198,7 +205,6 @@ extends Action<RevertModelSnapshotAction.Request, RevertModelSnapshotAction.Resp
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(ACKNOWLEDGED.getPreferredName(), true);
builder.field(MODEL.getPreferredName());
builder = model.toXContent(builder, params);
builder.endObject();

View File

@ -24,6 +24,7 @@ import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.sameInstance;
public class PutLicenseResponseTests extends ESTestCase {
@SuppressWarnings("unchecked")
public void testSerialization() throws Exception {
boolean acknowledged = randomBoolean();
LicensesStatus status = randomFrom(LicensesStatus.VALID, LicensesStatus.INVALID, LicensesStatus.EXPIRED);
@ -32,9 +33,7 @@ public class PutLicenseResponseTests extends ESTestCase {
PutLicenseResponse response = new PutLicenseResponse(acknowledged, status, "", ackMessages);
XContentBuilder contentBuilder = XContentFactory.jsonBuilder();
contentBuilder.startObject();
response.toXContent(contentBuilder, ToXContent.EMPTY_PARAMS);
contentBuilder.endObject();
Map<String, Object> map = XContentHelper.convertToMap(contentBuilder.bytes(), false, contentBuilder.contentType()).v2();
assertThat(map.containsKey("acknowledged"), equalTo(true));
@ -46,7 +45,7 @@ public class PutLicenseResponseTests extends ESTestCase {
assertThat(actualStatus, equalTo(status.name().toLowerCase(Locale.ROOT)));
assertThat(map.containsKey("acknowledge"), equalTo(true));
Map<String, List<String>> actualAckMessages = (HashMap) map.get("acknowledge");
Map<String, List<String>> actualAckMessages = (Map<String, List<String>>) map.get("acknowledge");
assertTrue(actualAckMessages.containsKey("message"));
actualAckMessages.remove("message");
assertThat(actualAckMessages.keySet(), equalTo(ackMessages.keySet()));
@ -82,7 +81,7 @@ public class PutLicenseResponseTests extends ESTestCase {
}
}
private Map<String, String[]> randomAckMessages() {
private static Map<String, String[]> randomAckMessages() {
int nFeatures = randomIntBetween(1, 5);
Map<String, String[]> ackMessages = new HashMap<>();

View File

@ -24,7 +24,7 @@ public class PutDatafeedActionResponseTests extends AbstractStreamableTestCase<R
DatafeedConfigTests.randomValidDatafeedId(), randomAlphaOfLength(10));
datafeedConfig.setIndices(Arrays.asList(randomAlphaOfLength(10)));
datafeedConfig.setTypes(Arrays.asList(randomAlphaOfLength(10)));
return new Response(randomBoolean(), datafeedConfig.build());
return new Response(datafeedConfig.build());
}
@Override

View File

@ -17,7 +17,7 @@ public class PutJobActionResponseTests extends AbstractStreamableTestCase<Respon
@Override
protected Response createTestInstance() {
Job.Builder builder = buildJobBuilder(randomValidJobId());
return new Response(randomBoolean(), builder.build());
return new Response(builder.build());
}
@Override

View File

@ -131,7 +131,7 @@ public class TransportPutDatafeedAction extends TransportMasterNodeAction<PutDat
if (acknowledged) {
logger.info("Created datafeed [{}]", request.getDatafeed().getId());
}
return new PutDatafeedAction.Response(acknowledged, request.getDatafeed());
return new PutDatafeedAction.Response(request.getDatafeed());
}
@Override

View File

@ -107,26 +107,24 @@ public class TransportRevertModelSnapshotAction extends TransportMasterNodeActio
// wrap the listener with one that invokes the OldDataRemover on
// acknowledged responses
return ActionListener.wrap(response -> {
if (response.isAcknowledged()) {
Date deleteAfter = modelSnapshot.getLatestResultTimeStamp();
logger.debug("Removing intervening records: last record: " + deleteAfter + ", last result: "
+ modelSnapshot.getLatestResultTimeStamp());
Date deleteAfter = modelSnapshot.getLatestResultTimeStamp();
logger.debug("Removing intervening records: last record: " + deleteAfter + ", last result: "
+ modelSnapshot.getLatestResultTimeStamp());
logger.info("Deleting results after '" + deleteAfter + "'");
logger.info("Deleting results after '" + deleteAfter + "'");
JobDataDeleter dataDeleter = new JobDataDeleter(client, jobId);
dataDeleter.deleteResultsFromTime(deleteAfter.getTime() + 1, new ActionListener<Boolean>() {
@Override
public void onResponse(Boolean success) {
listener.onResponse(response);
}
JobDataDeleter dataDeleter = new JobDataDeleter(client, jobId);
dataDeleter.deleteResultsFromTime(deleteAfter.getTime() + 1, new ActionListener<Boolean>() {
@Override
public void onResponse(Boolean success) {
listener.onResponse(response);
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
}, listener::onFailure);
}
@ -136,22 +134,20 @@ public class TransportRevertModelSnapshotAction extends TransportMasterNodeActio
return ActionListener.wrap(response -> {
if (response.isAcknowledged()) {
jobProvider.dataCounts(jobId, counts -> {
counts.setLatestRecordTimeStamp(modelSnapshot.getLatestRecordTimeStamp());
jobDataCountsPersister.persistDataCounts(jobId, counts, new ActionListener<Boolean>() {
@Override
public void onResponse(Boolean aBoolean) {
listener.onResponse(response);
}
jobProvider.dataCounts(jobId, counts -> {
counts.setLatestRecordTimeStamp(modelSnapshot.getLatestRecordTimeStamp());
jobDataCountsPersister.persistDataCounts(jobId, counts, new ActionListener<Boolean>() {
@Override
public void onResponse(Boolean aBoolean) {
listener.onResponse(response);
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
}, listener::onFailure);
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
}, listener::onFailure);
}, listener::onFailure);
}

View File

@ -59,7 +59,7 @@ public class TransportUpdateDatafeedAction extends TransportMasterNodeAction<Upd
if (acknowledged) {
logger.info("Updated datafeed [{}]", request.getUpdate().getId());
}
return new PutDatafeedAction.Response(acknowledged, updatedDatafeed);
return new PutDatafeedAction.Response(updatedDatafeed);
}
@Override

View File

@ -199,7 +199,7 @@ public class JobManager extends AbstractComponent {
@Override
protected PutJobAction.Response newResponse(boolean acknowledged) {
auditor.info(job.getId(), Messages.getMessage(Messages.JOB_AUDIT_CREATED));
return new PutJobAction.Response(acknowledged, job);
return new PutJobAction.Response(job);
}
@Override
@ -276,7 +276,7 @@ public class JobManager extends AbstractComponent {
@Override
protected PutJobAction.Response newResponse(boolean acknowledged) {
return new PutJobAction.Response(acknowledged, updatedJob);
return new PutJobAction.Response(updatedJob);
}
@Override
@ -460,7 +460,7 @@ public class JobManager extends AbstractComponent {
if (response) {
ModelSizeStats revertedModelSizeStats = new ModelSizeStats.Builder(modelSizeStats).setLogTime(new Date()).build();
persister.persistModelSizeStats(revertedModelSizeStats, WriteRequest.RefreshPolicy.IMMEDIATE, ActionListener.wrap(
modelSizeStatsResponseHandler::accept, actionListener::onFailure));
modelSizeStatsResponseHandler, actionListener::onFailure));
}
};
@ -481,7 +481,7 @@ public class JobManager extends AbstractComponent {
}
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
public ClusterState execute(ClusterState currentState) {
Job job = getJobOrThrowIfUnknown(request.getJobId(), currentState);
Job.Builder builder = new Job.Builder(job);
builder.setModelSnapshotId(modelSnapshot.getSnapshotId());
@ -506,5 +506,4 @@ public class JobManager extends AbstractComponent {
newState.metaData(MetaData.builder(currentState.getMetaData()).putCustom(MLMetadataField.TYPE, builder.build()).build());
return newState.build();
}
}

View File

@ -10,10 +10,10 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.AcknowledgedRestListener;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.xpack.core.ml.action.DeleteCalendarAction;
import org.elasticsearch.xpack.core.ml.calendars.Calendar;
import org.elasticsearch.xpack.ml.MachineLearning;
import java.io.IOException;
@ -33,6 +33,6 @@ public class RestDeleteCalendarAction extends BaseRestHandler {
@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
DeleteCalendarAction.Request request = new DeleteCalendarAction.Request(restRequest.param(Calendar.ID.getPreferredName()));
return channel -> client.execute(DeleteCalendarAction.INSTANCE, request, new AcknowledgedRestListener<>(channel));
return channel -> client.execute(DeleteCalendarAction.INSTANCE, request, new RestToXContentListener<>(channel));
}
}

View File

@ -10,11 +10,11 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.AcknowledgedRestListener;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.xpack.core.ml.action.DeleteCalendarEventAction;
import org.elasticsearch.xpack.core.ml.calendars.Calendar;
import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent;
import org.elasticsearch.xpack.ml.MachineLearning;
import java.io.IOException;
@ -37,6 +37,6 @@ public class RestDeleteCalendarEventAction extends BaseRestHandler {
String eventId = restRequest.param(ScheduledEvent.EVENT_ID.getPreferredName());
String calendarId = restRequest.param(Calendar.ID.getPreferredName());
DeleteCalendarEventAction.Request request = new DeleteCalendarEventAction.Request(calendarId, eventId);
return channel -> client.execute(DeleteCalendarEventAction.INSTANCE, request, new AcknowledgedRestListener<>(channel));
return channel -> client.execute(DeleteCalendarEventAction.INSTANCE, request, new RestToXContentListener<>(channel));
}
}

View File

@ -10,11 +10,11 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.AcknowledgedRestListener;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.xpack.core.ml.action.CloseJobAction;
import org.elasticsearch.xpack.core.ml.action.DeleteDatafeedAction;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.MachineLearning;
import java.io.IOException;
@ -40,6 +40,6 @@ public class RestDeleteDatafeedAction extends BaseRestHandler {
}
request.timeout(restRequest.paramAsTime("timeout", request.timeout()));
request.masterNodeTimeout(restRequest.paramAsTime("master_timeout", request.masterNodeTimeout()));
return channel -> client.execute(DeleteDatafeedAction.INSTANCE, request, new AcknowledgedRestListener<>(channel));
return channel -> client.execute(DeleteDatafeedAction.INSTANCE, request, new RestToXContentListener<>(channel));
}
}

View File

@ -10,10 +10,10 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.AcknowledgedRestListener;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.xpack.core.ml.action.DeleteFilterAction;
import org.elasticsearch.xpack.core.ml.action.DeleteFilterAction.Request;
import org.elasticsearch.xpack.ml.MachineLearning;
import java.io.IOException;
@ -35,7 +35,7 @@ public class RestDeleteFilterAction extends BaseRestHandler {
Request request = new Request(restRequest.param(Request.FILTER_ID.getPreferredName()));
request.timeout(restRequest.paramAsTime("timeout", request.timeout()));
request.masterNodeTimeout(restRequest.paramAsTime("master_timeout", request.masterNodeTimeout()));
return channel -> client.execute(DeleteFilterAction.INSTANCE, request, new AcknowledgedRestListener<>(channel));
return channel -> client.execute(DeleteFilterAction.INSTANCE, request, new RestToXContentListener<>(channel));
}
}

View File

@ -11,10 +11,10 @@ import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.AcknowledgedRestListener;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.xpack.core.ml.action.PutFilterAction;
import org.elasticsearch.xpack.core.ml.job.config.MlFilter;
import org.elasticsearch.xpack.ml.MachineLearning;
import java.io.IOException;
@ -36,7 +36,7 @@ public class RestPutFilterAction extends BaseRestHandler {
String filterId = restRequest.param(MlFilter.ID.getPreferredName());
XContentParser parser = restRequest.contentOrSourceParamParser();
PutFilterAction.Request putFilterRequest = PutFilterAction.Request.parseRequest(filterId, parser);
return channel -> client.execute(PutFilterAction.INSTANCE, putFilterRequest, new AcknowledgedRestListener<>(channel));
return channel -> client.execute(PutFilterAction.INSTANCE, putFilterRequest, new RestToXContentListener<>(channel));
}
}

View File

@ -10,11 +10,11 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.AcknowledgedRestListener;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.xpack.core.ml.action.CloseJobAction;
import org.elasticsearch.xpack.core.ml.action.DeleteJobAction;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.ml.MachineLearning;
import java.io.IOException;
@ -37,6 +37,6 @@ public class RestDeleteJobAction extends BaseRestHandler {
deleteJobRequest.setForce(restRequest.paramAsBoolean(CloseJobAction.Request.FORCE.getPreferredName(), deleteJobRequest.isForce()));
deleteJobRequest.timeout(restRequest.paramAsTime("timeout", deleteJobRequest.timeout()));
deleteJobRequest.masterNodeTimeout(restRequest.paramAsTime("master_timeout", deleteJobRequest.masterNodeTimeout()));
return channel -> client.execute(DeleteJobAction.INSTANCE, deleteJobRequest, new AcknowledgedRestListener<>(channel));
return channel -> client.execute(DeleteJobAction.INSTANCE, deleteJobRequest, new RestToXContentListener<>(channel));
}
}

View File

@ -10,7 +10,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.AcknowledgedRestListener;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction;
import org.elasticsearch.xpack.core.ml.job.config.Job;
@ -37,6 +37,6 @@ public class RestDeleteModelSnapshotAction extends BaseRestHandler {
restRequest.param(Job.ID.getPreferredName()),
restRequest.param(ModelSnapshotField.SNAPSHOT_ID.getPreferredName()));
return channel -> client.execute(DeleteModelSnapshotAction.INSTANCE, deleteModelSnapshot, new AcknowledgedRestListener<>(channel));
return channel -> client.execute(DeleteModelSnapshotAction.INSTANCE, deleteModelSnapshot, new RestToXContentListener<>(channel));
}
}

View File

@ -11,9 +11,9 @@ import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.AcknowledgedRestListener;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.xpack.core.ml.action.ValidateDetectorAction;
import org.elasticsearch.xpack.ml.MachineLearning;
import java.io.IOException;
@ -34,7 +34,7 @@ public class RestValidateDetectorAction extends BaseRestHandler {
XContentParser parser = restRequest.contentOrSourceParamParser();
ValidateDetectorAction.Request validateDetectorRequest = ValidateDetectorAction.Request.parseRequest(parser);
return channel ->
client.execute(ValidateDetectorAction.INSTANCE, validateDetectorRequest, new AcknowledgedRestListener<>(channel));
client.execute(ValidateDetectorAction.INSTANCE, validateDetectorRequest, new RestToXContentListener<>(channel));
}
}

View File

@ -11,9 +11,9 @@ import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.AcknowledgedRestListener;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.xpack.core.ml.action.ValidateJobConfigAction;
import org.elasticsearch.xpack.ml.MachineLearning;
import java.io.IOException;
@ -34,7 +34,7 @@ public class RestValidateJobConfigAction extends BaseRestHandler {
XContentParser parser = restRequest.contentOrSourceParamParser();
ValidateJobConfigAction.Request validateConfigRequest = ValidateJobConfigAction.Request.parseRequest(parser);
return channel ->
client.execute(ValidateJobConfigAction.INSTANCE, validateConfigRequest, new AcknowledgedRestListener<>(channel));
client.execute(ValidateJobConfigAction.INSTANCE, validateConfigRequest, new RestToXContentListener<>(channel));
}
}

View File

@ -294,14 +294,13 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase {
});
}
private void putJob() throws Exception {
private void putJob() {
Detector detector = new Detector.Builder("dc", "by_instance").build();
Job.Builder jobBuilder = new Job.Builder(JOB_ID);
jobBuilder.setDataDescription(new DataDescription.Builder());
jobBuilder.setAnalysisConfig(new AnalysisConfig.Builder(Collections.singletonList(detector)));
PutJobAction.Request request = new PutJobAction.Request(jobBuilder);
PutJobAction.Response response = client().execute(PutJobAction.INSTANCE, request).actionGet();
assertTrue(response.isAcknowledged());
client().execute(PutJobAction.INSTANCE, request).actionGet();
}
private Bucket createBucket(boolean isInterim) {

View File

@ -60,8 +60,7 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
Job.Builder job = createJob("fail-over-basics-job", new ByteSizeValue(2, ByteSizeUnit.MB));
PutJobAction.Request putJobRequest = new PutJobAction.Request(job);
PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).actionGet();
assertTrue(putJobResponse.isAcknowledged());
client().execute(PutJobAction.INSTANCE, putJobRequest).actionGet();
ensureGreen();
OpenJobAction.Request openJobRequest = new OpenJobAction.Request(job.getId());
client().execute(OpenJobAction.INSTANCE, openJobRequest).actionGet();
@ -91,8 +90,7 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
job.setDataDescription(new DataDescription.Builder());
PutJobAction.Request putJobRequest = new PutJobAction.Request(job);
PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).actionGet();
assertTrue(putJobResponse.isAcknowledged());
client().execute(PutJobAction.INSTANCE, putJobRequest).actionGet();
DatafeedConfig.Builder configBuilder = createDatafeedBuilder("data_feed_id", job.getId(), Collections.singletonList("*"));
MaxAggregationBuilder maxAggregation = AggregationBuilders.max("time").field("time");
@ -103,8 +101,7 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
configBuilder.setFrequency(TimeValue.timeValueMinutes(2));
DatafeedConfig config = configBuilder.build();
PutDatafeedAction.Request putDatafeedRequest = new PutDatafeedAction.Request(config);
PutDatafeedAction.Response putDatadeedResponse = client().execute(PutDatafeedAction.INSTANCE, putDatafeedRequest).actionGet();
assertTrue(putDatadeedResponse.isAcknowledged());
client().execute(PutDatafeedAction.INSTANCE, putDatafeedRequest).actionGet();
ensureGreen();
OpenJobAction.Request openJobRequest = new OpenJobAction.Request(job.getId());
@ -165,14 +162,11 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
Job.Builder job = createScheduledJob("job_id");
PutJobAction.Request putJobRequest = new PutJobAction.Request(job);
PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).actionGet();
assertTrue(putJobResponse.isAcknowledged());
client().execute(PutJobAction.INSTANCE, putJobRequest).actionGet();
DatafeedConfig config = createDatafeed("data_feed_id", job.getId(), Collections.singletonList("data"));
PutDatafeedAction.Request putDatafeedRequest = new PutDatafeedAction.Request(config);
PutDatafeedAction.Response putDatadeedResponse = client().execute(PutDatafeedAction.INSTANCE, putDatafeedRequest)
.actionGet();
assertTrue(putDatadeedResponse.isAcknowledged());
client().execute(PutDatafeedAction.INSTANCE, putDatafeedRequest).actionGet();
client().execute(OpenJobAction.INSTANCE, new OpenJobAction.Request(job.getId())).get();
@ -204,8 +198,7 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
String jobId = "dedicated-ml-node-job";
Job.Builder job = createJob(jobId, new ByteSizeValue(2, ByteSizeUnit.MB));
PutJobAction.Request putJobRequest = new PutJobAction.Request(job);
PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).actionGet();
assertTrue(putJobResponse.isAcknowledged());
client().execute(PutJobAction.INSTANCE, putJobRequest).actionGet();
OpenJobAction.Request openJobRequest = new OpenJobAction.Request(job.getId());
client().execute(OpenJobAction.INSTANCE, openJobRequest).actionGet();
@ -286,8 +279,7 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
for (int i = 0; i < numJobs; i++) {
Job.Builder job = createJob(Integer.toString(i), new ByteSizeValue(2, ByteSizeUnit.MB));
PutJobAction.Request putJobRequest = new PutJobAction.Request(job);
PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).actionGet();
assertTrue(putJobResponse.isAcknowledged());
client().execute(PutJobAction.INSTANCE, putJobRequest).actionGet();
OpenJobAction.Request openJobRequest = new OpenJobAction.Request(job.getId());
client().execute(OpenJobAction.INSTANCE, openJobRequest).actionGet();
@ -347,8 +339,7 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
String jobId = "ml-indices-not-available-job";
Job.Builder job = createFareQuoteJob(jobId);
PutJobAction.Request putJobRequest = new PutJobAction.Request(job);
PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).actionGet();
assertTrue(putJobResponse.isAcknowledged());
client().execute(PutJobAction.INSTANCE, putJobRequest).actionGet();
OpenJobAction.Request openJobRequest = new OpenJobAction.Request(job.getId());
client().execute(OpenJobAction.INSTANCE, openJobRequest).actionGet();

View File

@ -30,14 +30,13 @@ public class DeleteJobIT extends BaseMlIntegTestCase {
final String jobId = "wait-for-delete-job";
Job.Builder job = createJob(jobId);
PutJobAction.Request putJobRequest = new PutJobAction.Request(job);
PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).get();
assertTrue(putJobResponse.isAcknowledged());
client().execute(PutJobAction.INSTANCE, putJobRequest).get();
AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
CountDownLatch markAsDeletedLatch = new CountDownLatch(1);
clusterService().submitStateUpdateTask("mark-job-as-deleted", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
public ClusterState execute(ClusterState currentState) {
return markJobAsDeleted(jobId, currentState);
}

View File

@ -218,8 +218,7 @@ public class EstablishedMemUsageIT extends BaseMlIntegTestCase {
Job.Builder job = createJob(jobId);
PutJobAction.Request putJobRequest = new PutJobAction.Request(job);
PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).actionGet();
assertTrue(putJobResponse.isAcknowledged());
client().execute(PutJobAction.INSTANCE, putJobRequest).actionGet();
}
private void createBuckets(String jobId, int count) {

View File

@ -504,8 +504,7 @@ public class JobProviderIT extends MlSingleNodeTestCase {
builder.setDataDescription(dc);
PutJobAction.Request request = new PutJobAction.Request(builder);
PutJobAction.Response response = client().execute(PutJobAction.INSTANCE, request).actionGet();
assertTrue(response.isAcknowledged());
client().execute(PutJobAction.INSTANCE, request).actionGet();
return builder;
}

View File

@ -31,8 +31,7 @@ public class JobStorageDeletionTaskIT extends BaseMlIntegTestCase {
Job.Builder job = createJob("delete-aliases-test-job", new ByteSizeValue(2, ByteSizeUnit.MB));
PutJobAction.Request putJobRequest = new PutJobAction.Request(job);
PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).actionGet();
assertTrue(putJobResponse.isAcknowledged());
client().execute(PutJobAction.INSTANCE, putJobRequest).actionGet();
OpenJobAction.Request openJobRequest = new OpenJobAction.Request(job.getId());
client().execute(OpenJobAction.INSTANCE, openJobRequest).actionGet();

View File

@ -171,14 +171,11 @@ public class MlDistributedFailureIT extends BaseMlIntegTestCase {
private void setupJobAndDatafeed(String jobId, String datafeedId) throws Exception {
Job.Builder job = createScheduledJob(jobId);
PutJobAction.Request putJobRequest = new PutJobAction.Request(job);
PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).actionGet();
assertTrue(putJobResponse.isAcknowledged());
client().execute(PutJobAction.INSTANCE, putJobRequest).actionGet();
DatafeedConfig config = createDatafeed(datafeedId, job.getId(), Collections.singletonList("data"));
PutDatafeedAction.Request putDatafeedRequest = new PutDatafeedAction.Request(config);
PutDatafeedAction.Response putDatadeedResponse = client().execute(PutDatafeedAction.INSTANCE, putDatafeedRequest)
.actionGet();
assertTrue(putDatadeedResponse.isAcknowledged());
client().execute(PutDatafeedAction.INSTANCE, putDatafeedRequest).actionGet();
client().execute(OpenJobAction.INSTANCE, new OpenJobAction.Request(job.getId()));
assertBusy(() -> {

View File

@ -58,8 +58,7 @@ public class NetworkDisruptionIT extends BaseMlIntegTestCase {
Job.Builder job = createJob("relocation-job", new ByteSizeValue(2, ByteSizeUnit.MB));
PutJobAction.Request putJobRequest = new PutJobAction.Request(job);
PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).actionGet();
assertTrue(putJobResponse.isAcknowledged());
client().execute(PutJobAction.INSTANCE, putJobRequest).actionGet();
ensureGreen();
OpenJobAction.Request openJobRequest = new OpenJobAction.Request(job.getId());

View File

@ -34,8 +34,7 @@ public class TooManyJobsIT extends BaseMlIntegTestCase {
// create and open first job, which succeeds:
Job.Builder job = createJob("close-failed-job-1", new ByteSizeValue(2, ByteSizeUnit.MB));
PutJobAction.Request putJobRequest = new PutJobAction.Request(job);
PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).get();
assertTrue(putJobResponse.isAcknowledged());
client().execute(PutJobAction.INSTANCE, putJobRequest).get();
client().execute(OpenJobAction.INSTANCE, new OpenJobAction.Request(job.getId())).get();
assertBusy(() -> {
GetJobsStatsAction.Response statsResponse =
@ -46,8 +45,7 @@ public class TooManyJobsIT extends BaseMlIntegTestCase {
// create and try to open second job, which fails:
job = createJob("close-failed-job-2", new ByteSizeValue(2, ByteSizeUnit.MB));
putJobRequest = new PutJobAction.Request(job);
putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).get();
assertTrue(putJobResponse.isAcknowledged());
client().execute(PutJobAction.INSTANCE, putJobRequest).get();
expectThrows(ElasticsearchStatusException.class,
() -> client().execute(OpenJobAction.INSTANCE, new OpenJobAction.Request("close-failed-job-2")).actionGet());
@ -82,8 +80,7 @@ public class TooManyJobsIT extends BaseMlIntegTestCase {
for (int i = 1; i <= (clusterWideMaxNumberOfJobs + 1); i++) {
Job.Builder job = createJob("max-number-of-jobs-limit-job-" + Integer.toString(i), jobModelMemoryLimit);
PutJobAction.Request putJobRequest = new PutJobAction.Request(job);
PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).get();
assertTrue(putJobResponse.isAcknowledged());
client().execute(PutJobAction.INSTANCE, putJobRequest).get();
OpenJobAction.Request openJobRequest = new OpenJobAction.Request(job.getId());
try {

View File

@ -216,7 +216,6 @@ setup:
job_id: "revert-model-snapshot"
snapshot_id: "first"
- match: { acknowledged: true }
- match: { model.job_id: "revert-model-snapshot" }
- match: { model.timestamp: 1464825600000 }
- match: { model.snapshot_id: "first" }
@ -227,7 +226,6 @@ setup:
job_id: "revert-model-snapshot"
snapshot_id: "second"
- match: { acknowledged: true }
- match: { model.job_id: "revert-model-snapshot" }
- match: { model.timestamp: 1464739200000 }
- match: { model.snapshot_id: "second" }

View File

@ -8,13 +8,11 @@ package org.elasticsearch.xpack.watcher.rest.action;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.AcknowledgedRestListener;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.xpack.core.watcher.client.WatcherClient;
import org.elasticsearch.xpack.core.watcher.transport.actions.service.WatcherServiceRequest;
import org.elasticsearch.xpack.watcher.rest.WatcherRestHandler;
import java.io.IOException;
import static org.elasticsearch.rest.RestRequest.Method.POST;
public class RestWatchServiceAction extends WatcherRestHandler {
@ -31,10 +29,8 @@ public class RestWatchServiceAction extends WatcherRestHandler {
}
@Override
public RestChannelConsumer doPrepareRequest(RestRequest request, WatcherClient client)
throws IOException {
return channel -> client.watcherService(new WatcherServiceRequest().start(),
new AcknowledgedRestListener<>(channel));
public RestChannelConsumer doPrepareRequest(RestRequest request, WatcherClient client) {
return channel -> client.watcherService(new WatcherServiceRequest().start(), new RestToXContentListener<>(channel));
}
private static class StopRestHandler extends WatcherRestHandler {
@ -49,10 +45,8 @@ public class RestWatchServiceAction extends WatcherRestHandler {
}
@Override
public RestChannelConsumer doPrepareRequest(RestRequest request, WatcherClient client)
throws IOException {
return channel -> client.watcherService(new WatcherServiceRequest().stop(), new
AcknowledgedRestListener<>(channel));
public RestChannelConsumer doPrepareRequest(RestRequest request, WatcherClient client) {
return channel -> client.watcherService(new WatcherServiceRequest().stop(), new RestToXContentListener<>(channel));
}
}
}

View File

@ -68,7 +68,6 @@ public class DatafeedJobsIT extends MlNativeAutodetectIntegTestCase {
Job.Builder job = createScheduledJob("lookback-job");
registerJob(job);
PutJobAction.Response putJobResponse = putJob(job);
assertTrue(putJobResponse.isAcknowledged());
assertThat(putJobResponse.getResponse().getJobVersion(), equalTo(Version.CURRENT));
openJob(job.getId());
assertBusy(() -> assertEquals(getJobStats(job.getId()).get(0).getState(), JobState.OPENED));
@ -78,7 +77,7 @@ public class DatafeedJobsIT extends MlNativeAutodetectIntegTestCase {
t.add("data-2");
DatafeedConfig datafeedConfig = createDatafeed(job.getId() + "-datafeed", job.getId(), t);
registerDatafeed(datafeedConfig);
assertTrue(putDatafeed(datafeedConfig).isAcknowledged());
putDatafeed(datafeedConfig);
startDatafeed(datafeedConfig.getId(), 0L, now);
assertBusy(() -> {
@ -245,7 +244,6 @@ public class DatafeedJobsIT extends MlNativeAutodetectIntegTestCase {
Job.Builder job = createScheduledJob("lookback-job-stopped-then-killed");
registerJob(job);
PutJobAction.Response putJobResponse = putJob(job);
assertTrue(putJobResponse.isAcknowledged());
assertThat(putJobResponse.getResponse().getJobVersion(), equalTo(Version.CURRENT));
openJob(job.getId());
assertBusy(() -> assertEquals(getJobStats(job.getId()).get(0).getState(), JobState.OPENED));
@ -256,8 +254,7 @@ public class DatafeedJobsIT extends MlNativeAutodetectIntegTestCase {
datafeedConfigBuilder.setChunkingConfig(ChunkingConfig.newManual(new TimeValue(1, TimeUnit.SECONDS)));
DatafeedConfig datafeedConfig = datafeedConfigBuilder.build();
registerDatafeed(datafeedConfig);
assertTrue(putDatafeed(datafeedConfig).isAcknowledged());
putDatafeed(datafeedConfig);
startDatafeed(datafeedConfig.getId(), 0L, now);
assertBusy(() -> {
DataCounts dataCounts = getDataCounts(job.getId());
@ -291,14 +288,13 @@ public class DatafeedJobsIT extends MlNativeAutodetectIntegTestCase {
Job.Builder job = createScheduledJob(jobId);
registerJob(job);
assertTrue(putJob(job).isAcknowledged());
putJob(job);
openJob(job.getId());
assertBusy(() -> assertEquals(getJobStats(job.getId()).get(0).getState(), JobState.OPENED));
DatafeedConfig datafeedConfig = createDatafeed(job.getId() + "-datafeed", job.getId(), Collections.singletonList("data"));
registerDatafeed(datafeedConfig);
assertTrue(putDatafeed(datafeedConfig).isAcknowledged());
putDatafeed(datafeedConfig);
startDatafeed(datafeedConfig.getId(), 0L, null);
assertBusy(() -> {
DataCounts dataCounts = getDataCounts(job.getId());

View File

@ -55,7 +55,6 @@ public class MLTransportClientIT extends ESXPackSmokeClientTestCase {
PutJobAction.Response putJobResponse = mlClient.putJob(new PutJobAction.Request(job)).actionGet();
assertThat(putJobResponse, notNullValue());
assertThat(putJobResponse.isAcknowledged(), equalTo(true));
GetJobsAction.Response getJobResponse = mlClient.getJobs(new GetJobsAction.Request(jobId)).actionGet();
assertThat(getJobResponse, notNullValue());
@ -131,7 +130,6 @@ public class MLTransportClientIT extends ESXPackSmokeClientTestCase {
PutJobAction.Response putJobResponse = mlClient.putJob(new PutJobAction.Request(job)).actionGet();
assertThat(putJobResponse, notNullValue());
assertThat(putJobResponse.isAcknowledged(), equalTo(true));
String datafeedId = "ml-transport-client-it-datafeed";
DatafeedConfig.Builder datafeed = new DatafeedConfig.Builder(datafeedId, jobId);
@ -140,8 +138,7 @@ public class MLTransportClientIT extends ESXPackSmokeClientTestCase {
datafeed.setIndices(Collections.singletonList(datafeedIndex));
datafeed.setTypes(Collections.singletonList("type-bar"));
PutDatafeedAction.Response putDatafeedResponse = mlClient.putDatafeed(new PutDatafeedAction.Request(datafeed.build())).actionGet();
assertThat(putDatafeedResponse.isAcknowledged(), equalTo(true));
mlClient.putDatafeed(new PutDatafeedAction.Request(datafeed.build())).actionGet();
GetDatafeedsAction.Response getDatafeedResponse = mlClient.getDatafeeds(new GetDatafeedsAction.Request(datafeedId)).actionGet();
assertThat(getDatafeedResponse.getResponse(), notNullValue());