[CCR] added rest specs and simple rest test for follow and unfollow apis (#30123)

[CCR] added rest specs and simple rest test for follow and unfollow apis, also

Added an acknowledge field in follow and unfollow api responses. Currently these api return an empty response and fixed bug in unfollow api that didn't cleanup node tasks properly.
This commit is contained in:
Martijn van Groningen 2018-05-07 14:18:28 +02:00 committed by GitHub
parent 2c73969505
commit ad499fc178
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 143 additions and 36 deletions

View File

@ -10,10 +10,10 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
@ -139,8 +139,26 @@ public class FollowExistingIndexAction extends Action<FollowExistingIndexAction.
} }
} }
public static class Response extends ActionResponse { public static class Response extends AcknowledgedResponse {
Response() {
}
Response(boolean acknowledged) {
super(acknowledged);
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
readAcknowledged(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
writeAcknowledged(out);
}
} }
public static class TransportAction extends HandledTransportAction<Request, Response> { public static class TransportAction extends HandledTransportAction<Request, Response> {
@ -261,7 +279,7 @@ public class FollowExistingIndexAction extends Action<FollowExistingIndexAction.
if (error == null) { if (error == null) {
// include task ids? // include task ids?
handler.onResponse(new Response()); handler.onResponse(new Response(true));
} else { } else {
// TODO: cancel all started tasks // TODO: cancel all started tasks
handler.onFailure(error); handler.onFailure(error);

View File

@ -29,6 +29,11 @@ public class ShardFollowNodeTask extends AllocatedPersistentTask {
super(id, type, action, description, parentTask, headers); super(id, type, action, description, parentTask, headers);
} }
@Override
protected void onCancelled() {
markAsCompleted();
}
void updateProcessedGlobalCheckpoint(long processedGlobalCheckpoint) { void updateProcessedGlobalCheckpoint(long processedGlobalCheckpoint) {
this.processedGlobalCheckpoint.set(processedGlobalCheckpoint); this.processedGlobalCheckpoint.set(processedGlobalCheckpoint);
} }

View File

@ -10,10 +10,10 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
@ -81,13 +81,32 @@ public class UnfollowIndexAction extends Action<UnfollowIndexAction.Request, Unf
} }
} }
public static class Response extends ActionResponse { public static class Response extends AcknowledgedResponse {
Response(boolean acknowledged) {
super(acknowledged);
}
Response() {
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
readAcknowledged(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
writeAcknowledged(out);
}
} }
public static class RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder> { public static class RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder> {
public RequestBuilder(ElasticsearchClient client) { RequestBuilder(ElasticsearchClient client) {
super(client, INSTANCE, new Request()); super(client, INSTANCE, new Request());
} }
} }
@ -147,7 +166,7 @@ public class UnfollowIndexAction extends Action<UnfollowIndexAction.Request, Unf
if (error == null) { if (error == null) {
// include task ids? // include task ids?
listener.onResponse(new Response()); listener.onResponse(new Response(true));
} else { } else {
// TODO: cancel all started tasks // TODO: cancel all started tasks
listener.onFailure(error); listener.onFailure(error);

View File

@ -7,21 +7,16 @@ package org.elasticsearch.xpack.ccr.rest;
import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestResponse; import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.RestBuilderListener;
import org.elasticsearch.xpack.ccr.action.ShardFollowTask; import org.elasticsearch.xpack.ccr.action.ShardFollowTask;
import java.io.IOException; import java.io.IOException;
import static org.elasticsearch.xpack.ccr.action.FollowExistingIndexAction.INSTANCE; import static org.elasticsearch.xpack.ccr.action.FollowExistingIndexAction.INSTANCE;
import static org.elasticsearch.xpack.ccr.action.FollowExistingIndexAction.Request; import static org.elasticsearch.xpack.ccr.action.FollowExistingIndexAction.Request;
import static org.elasticsearch.xpack.ccr.action.FollowExistingIndexAction.Response;
// TODO: change to confirm with API design // TODO: change to confirm with API design
public class RestFollowExistingIndexAction extends BaseRestHandler { public class RestFollowExistingIndexAction extends BaseRestHandler {
@ -52,13 +47,6 @@ public class RestFollowExistingIndexAction extends BaseRestHandler {
long value = Long.valueOf(restRequest.param(ShardFollowTask.PROCESSOR_MAX_TRANSLOG_BYTES_PER_REQUEST.getPreferredName())); long value = Long.valueOf(restRequest.param(ShardFollowTask.PROCESSOR_MAX_TRANSLOG_BYTES_PER_REQUEST.getPreferredName()));
request.setProcessorMaxTranslogBytes(value); request.setProcessorMaxTranslogBytes(value);
} }
return channel -> client.execute(INSTANCE, request, new RestBuilderListener<Response>(channel) { return channel -> client.execute(INSTANCE, request, new RestToXContentListener<>(channel));
@Override
public RestResponse buildResponse(Response response, XContentBuilder builder) throws Exception {
return new BytesRestResponse(RestStatus.OK, builder.startObject()
.endObject());
}
});
} }
} }

View File

@ -7,20 +7,15 @@ package org.elasticsearch.xpack.ccr.rest;
import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestResponse; import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.RestBuilderListener;
import java.io.IOException; import java.io.IOException;
import static org.elasticsearch.xpack.ccr.action.UnfollowIndexAction.INSTANCE; import static org.elasticsearch.xpack.ccr.action.UnfollowIndexAction.INSTANCE;
import static org.elasticsearch.xpack.ccr.action.UnfollowIndexAction.Request; import static org.elasticsearch.xpack.ccr.action.UnfollowIndexAction.Request;
import static org.elasticsearch.xpack.ccr.action.UnfollowIndexAction.Response;
// TODO: change to confirm with API design // TODO: change to confirm with API design
public class RestUnfollowIndexAction extends BaseRestHandler { public class RestUnfollowIndexAction extends BaseRestHandler {
@ -40,13 +35,6 @@ public class RestUnfollowIndexAction extends BaseRestHandler {
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
Request request = new Request(); Request request = new Request();
request.setFollowIndex(restRequest.param("follow_index")); request.setFollowIndex(restRequest.param("follow_index"));
return channel -> client.execute(INSTANCE, request, new RestBuilderListener<Response>(channel) { return channel -> client.execute(INSTANCE, request, new RestToXContentListener<>(channel));
@Override
public RestResponse buildResponse(Response response, XContentBuilder builder) throws Exception {
return new BytesRestResponse(RestStatus.OK, builder.startObject()
.endObject());
}
});
} }
} }

View File

@ -5,6 +5,7 @@
*/ */
package org.elasticsearch.xpack.ccr; package org.elasticsearch.xpack.ccr;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.admin.indices.stats.ShardStats;
@ -204,6 +205,17 @@ public class ShardChangesIT extends ESIntegTestCase {
final ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); final ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
final PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); final PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
assertThat(tasks.tasks().size(), equalTo(0)); assertThat(tasks.tasks().size(), equalTo(0));
ListTasksRequest listTasksRequest = new ListTasksRequest();
listTasksRequest.setDetailed(true);
ListTasksResponse listTasksResponse = client().admin().cluster().listTasks(listTasksRequest).get();
int numNodeTasks = 0;
for (TaskInfo taskInfo : listTasksResponse.getTasks()) {
if (taskInfo.getAction().startsWith(ListTasksAction.NAME) == false) {
numNodeTasks++;
}
}
assertThat(numNodeTasks, equalTo(0));
}); });
} }

View File

@ -0,0 +1,24 @@
{
"xpack.ccr.follow_existing_index": {
"documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/current",
"methods": [ "POST" ],
"url": {
"path": "/_xpack/ccr/{follow_index}/_follow",
"paths": [ "/_xpack/ccr/{follow_index}/_follow" ],
"parts": {
"follow_index": {
"type": "string",
"required": true,
"description": "The name of the index that follows to leader index."
}
},
"params": {
"leader_index": {
"type": "string",
"required": true,
"description": "The name of the index to read the changes from."
}
}
}
}
}

View File

@ -0,0 +1,19 @@
{
"xpack.ccr.unfollow_index": {
"documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/current",
"methods": [ "POST" ],
"url": {
"path": "/_xpack/ccr/{follow_index}/_unfollow",
"paths": [ "/_xpack/ccr/{follow_index}/_unfollow" ],
"parts": {
"follow_index": {
"type": "string",
"required": true,
"description": "The name of the follow index that should stop following its leader index."
}
},
"params": {
}
}
}
}

View File

@ -0,0 +1,34 @@
---
"Test follow and unfollow an existing index":
- do:
indices.create:
index: foo
body:
mappings:
doc:
properties:
field:
type: keyword
- is_true: acknowledged
- do:
indices.create:
index: bar
body:
mappings:
doc:
properties:
field:
type: keyword
- is_true: acknowledged
- do:
xpack.ccr.follow_existing_index:
leader_index: foo
follow_index: bar
- is_true: acknowledged
- do:
xpack.ccr.unfollow_index:
follow_index: bar
- is_true: acknowledged