[CCR] Move api parameters from url to request body. (#31949)

Relates to #30102
This commit is contained in:
Martijn van Groningen 2018-07-11 10:16:43 +02:00 committed by GitHub
parent 8e1ef0cff9
commit 815faf34fc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 153 additions and 82 deletions

View File

@ -145,15 +145,13 @@ public class FollowIndexSecurityIT extends ESRestTestCase {
private static void followIndex(String leaderIndex, String followIndex) throws IOException {
final Request request = new Request("POST", "/" + followIndex + "/_xpack/ccr/_follow");
request.addParameter("leader_index", leaderIndex);
request.addParameter("idle_shard_retry_delay", "10ms");
request.setJsonEntity("{\"leader_index\": \"" + leaderIndex + "\", \"idle_shard_retry_delay\": \"10ms\"}");
assertOK(client().performRequest(request));
}
private static void createAndFollowIndex(String leaderIndex, String followIndex) throws IOException {
final Request request = new Request("POST", "/" + followIndex + "/_xpack/ccr/_create_and_follow");
request.addParameter("leader_index", leaderIndex);
request.addParameter("idle_shard_retry_delay", "10ms");
request.setJsonEntity("{\"leader_index\": \"" + leaderIndex + "\", \"idle_shard_retry_delay\": \"10ms\"}");
assertOK(client().performRequest(request));
}

View File

@ -95,15 +95,13 @@ public class FollowIndexIT extends ESRestTestCase {
private static void followIndex(String leaderIndex, String followIndex) throws IOException {
final Request request = new Request("POST", "/" + followIndex + "/_xpack/ccr/_follow");
request.addParameter("leader_index", leaderIndex);
request.addParameter("idle_shard_retry_delay", "10ms");
request.setJsonEntity("{\"leader_index\": \"" + leaderIndex + "\", \"idle_shard_retry_delay\": \"10ms\"}");
assertOK(client().performRequest(request));
}
private static void createAndFollowIndex(String leaderIndex, String followIndex) throws IOException {
final Request request = new Request("POST", "/" + followIndex + "/_xpack/ccr/_create_and_follow");
request.addParameter("leader_index", leaderIndex);
request.addParameter("idle_shard_retry_delay", "10ms");
request.setJsonEntity("{\"leader_index\": \"" + leaderIndex + "\", \"idle_shard_retry_delay\": \"10ms\"}");
assertOK(client().performRequest(request));
}

View File

@ -263,7 +263,7 @@ public class CreateAndFollowIndexAction extends Action<CreateAndFollowIndexActio
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
String followIndex = request.getFollowRequest().getFollowIndex();
String followIndex = request.getFollowRequest().getFollowerIndex();
IndexMetaData currentIndex = currentState.metaData().index(followIndex);
if (currentIndex != null) {
throw new ResourceAlreadyExistsException(currentIndex.getIndex());
@ -294,10 +294,10 @@ public class CreateAndFollowIndexAction extends Action<CreateAndFollowIndexActio
ClusterState updatedState = builder.build();
RoutingTable.Builder routingTableBuilder = RoutingTable.builder(updatedState.routingTable())
.addAsNew(updatedState.metaData().index(request.getFollowRequest().getFollowIndex()));
.addAsNew(updatedState.metaData().index(request.getFollowRequest().getFollowerIndex()));
updatedState = allocationService.reroute(
ClusterState.builder(updatedState).routingTable(routingTableBuilder.build()).build(),
"follow index [" + request.getFollowRequest().getFollowIndex() + "] created");
"follow index [" + request.getFollowRequest().getFollowerIndex() + "] created");
logger.info("[{}] creating index, cause [ccr_create_and_follow], shards [{}]/[{}]",
followIndex, followIMD.getNumberOfShards(), followIMD.getNumberOfReplicas());
@ -308,7 +308,7 @@ public class CreateAndFollowIndexAction extends Action<CreateAndFollowIndexActio
}
private void initiateFollowing(Request request, ActionListener<Response> listener) {
activeShardsObserver.waitForActiveShards(new String[]{request.followRequest.getFollowIndex()},
activeShardsObserver.waitForActiveShards(new String[]{request.followRequest.getFollowerIndex()},
ActiveShardCount.DEFAULT, request.timeout(), result -> {
if (result) {
client.execute(FollowIndexAction.INSTANCE, request.getFollowRequest(), ActionListener.wrap(
@ -323,7 +323,7 @@ public class CreateAndFollowIndexAction extends Action<CreateAndFollowIndexActio
@Override
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
return state.blocks().indexBlockedException(ClusterBlockLevel.METADATA_WRITE, request.getFollowRequest().getFollowIndex());
return state.blocks().indexBlockedException(ClusterBlockLevel.METADATA_WRITE, request.getFollowRequest().getFollowerIndex());
}
}

View File

@ -19,12 +19,18 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.IndexingSlowLog;
import org.elasticsearch.index.SearchSlowLog;
@ -68,10 +74,51 @@ public class FollowIndexAction extends Action<FollowIndexAction.Response> {
return new Response();
}
public static class Request extends ActionRequest {
public static class Request extends ActionRequest implements ToXContentObject {
private static final ParseField LEADER_INDEX_FIELD = new ParseField("leader_index");
private static final ParseField FOLLOWER_INDEX_FIELD = new ParseField("follower_index");
private static final ConstructingObjectParser<Request, String> PARSER = new ConstructingObjectParser<>(NAME, true,
(args, followerIndex) -> {
if (args[1] != null) {
followerIndex = (String) args[1];
}
return new Request((String) args[0], followerIndex, (Integer) args[2], (Integer) args[3], (Long) args[4],
(Integer) args[5], (Integer) args[6], (TimeValue) args[7], (TimeValue) args[8]);
});
static {
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), LEADER_INDEX_FIELD);
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), FOLLOWER_INDEX_FIELD);
PARSER.declareInt(ConstructingObjectParser.optionalConstructorArg(), ShardFollowTask.MAX_BATCH_OPERATION_COUNT);
PARSER.declareInt(ConstructingObjectParser.optionalConstructorArg(), ShardFollowTask.MAX_CONCURRENT_READ_BATCHES);
PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), ShardFollowTask.MAX_BATCH_SIZE_IN_BYTES);
PARSER.declareInt(ConstructingObjectParser.optionalConstructorArg(), ShardFollowTask.MAX_CONCURRENT_WRITE_BATCHES);
PARSER.declareInt(ConstructingObjectParser.optionalConstructorArg(), ShardFollowTask.MAX_WRITE_BUFFER_SIZE);
PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(),
(p, c) -> TimeValue.parseTimeValue(p.text(), ShardFollowTask.RETRY_TIMEOUT.getPreferredName()),
ShardFollowTask.RETRY_TIMEOUT, ObjectParser.ValueType.STRING);
PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(),
(p, c) -> TimeValue.parseTimeValue(p.text(), ShardFollowTask.IDLE_SHARD_RETRY_DELAY.getPreferredName()),
ShardFollowTask.IDLE_SHARD_RETRY_DELAY, ObjectParser.ValueType.STRING);
}
public static Request fromXContent(XContentParser parser, String followerIndex) throws IOException {
Request request = PARSER.parse(parser, followerIndex);
if (followerIndex != null) {
if (request.followerIndex == null) {
request.followerIndex = followerIndex;
} else {
if (request.followerIndex.equals(followerIndex) == false) {
throw new IllegalArgumentException("provided follower_index is not equal");
}
}
}
return request;
}
private String leaderIndex;
private String followIndex;
private String followerIndex;
private int maxBatchOperationCount;
private int maxConcurrentReadBatches;
private long maxOperationSizeInBytes;
@ -80,9 +127,37 @@ public class FollowIndexAction extends Action<FollowIndexAction.Response> {
private TimeValue retryTimeout;
private TimeValue idleShardRetryDelay;
public Request(String leaderIndex, String followIndex, int maxBatchOperationCount, int maxConcurrentReadBatches,
long maxOperationSizeInBytes, int maxConcurrentWriteBatches, int maxWriteBufferSize,
public Request(String leaderIndex, String followerIndex, Integer maxBatchOperationCount, Integer maxConcurrentReadBatches,
Long maxOperationSizeInBytes, Integer maxConcurrentWriteBatches, Integer maxWriteBufferSize,
TimeValue retryTimeout, TimeValue idleShardRetryDelay) {
if (leaderIndex == null) {
throw new IllegalArgumentException("leader_index is missing");
}
if (followerIndex == null) {
throw new IllegalArgumentException("follower_index is missing");
}
if (maxBatchOperationCount == null) {
maxBatchOperationCount = ShardFollowNodeTask.DEFAULT_MAX_BATCH_OPERATION_COUNT;
}
if (maxConcurrentReadBatches == null) {
maxConcurrentReadBatches = ShardFollowNodeTask.DEFAULT_MAX_CONCURRENT_READ_BATCHES;
}
if (maxOperationSizeInBytes == null) {
maxOperationSizeInBytes = ShardFollowNodeTask.DEFAULT_MAX_BATCH_SIZE_IN_BYTES;
}
if (maxConcurrentWriteBatches == null) {
maxConcurrentWriteBatches = ShardFollowNodeTask.DEFAULT_MAX_CONCURRENT_WRITE_BATCHES;
}
if (maxWriteBufferSize == null) {
maxWriteBufferSize = ShardFollowNodeTask.DEFAULT_MAX_WRITE_BUFFER_SIZE;
}
if (retryTimeout == null) {
retryTimeout = ShardFollowNodeTask.DEFAULT_RETRY_TIMEOUT;
}
if (idleShardRetryDelay == null) {
idleShardRetryDelay = ShardFollowNodeTask.DEFAULT_IDLE_SHARD_RETRY_DELAY;
}
if (maxBatchOperationCount < 1) {
throw new IllegalArgumentException("maxBatchOperationCount must be larger than 0");
}
@ -99,15 +174,15 @@ public class FollowIndexAction extends Action<FollowIndexAction.Response> {
throw new IllegalArgumentException("maxWriteBufferSize must be larger than 0");
}
this.leaderIndex = Objects.requireNonNull(leaderIndex);
this.followIndex = Objects.requireNonNull(followIndex);
this.leaderIndex = leaderIndex;
this.followerIndex = followerIndex;
this.maxBatchOperationCount = maxBatchOperationCount;
this.maxConcurrentReadBatches = maxConcurrentReadBatches;
this.maxOperationSizeInBytes = maxOperationSizeInBytes;
this.maxConcurrentWriteBatches = maxConcurrentWriteBatches;
this.maxWriteBufferSize = maxWriteBufferSize;
this.retryTimeout = Objects.requireNonNull(retryTimeout);
this.idleShardRetryDelay = Objects.requireNonNull(idleShardRetryDelay);
this.retryTimeout = retryTimeout;
this.idleShardRetryDelay = idleShardRetryDelay;
}
Request() {
@ -117,8 +192,8 @@ public class FollowIndexAction extends Action<FollowIndexAction.Response> {
return leaderIndex;
}
public String getFollowIndex() {
return followIndex;
public String getFollowerIndex() {
return followerIndex;
}
public int getMaxBatchOperationCount() {
@ -134,7 +209,7 @@ public class FollowIndexAction extends Action<FollowIndexAction.Response> {
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
leaderIndex = in.readString();
followIndex = in.readString();
followerIndex = in.readString();
maxBatchOperationCount = in.readVInt();
maxConcurrentReadBatches = in.readVInt();
maxOperationSizeInBytes = in.readVLong();
@ -148,7 +223,7 @@ public class FollowIndexAction extends Action<FollowIndexAction.Response> {
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(leaderIndex);
out.writeString(followIndex);
out.writeString(followerIndex);
out.writeVInt(maxBatchOperationCount);
out.writeVInt(maxConcurrentReadBatches);
out.writeVLong(maxOperationSizeInBytes);
@ -158,6 +233,24 @@ public class FollowIndexAction extends Action<FollowIndexAction.Response> {
out.writeOptionalTimeValue(idleShardRetryDelay);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
{
builder.field(LEADER_INDEX_FIELD.getPreferredName(), leaderIndex);
builder.field(FOLLOWER_INDEX_FIELD.getPreferredName(), followerIndex);
builder.field(ShardFollowTask.MAX_BATCH_OPERATION_COUNT.getPreferredName(), maxBatchOperationCount);
builder.field(ShardFollowTask.MAX_BATCH_SIZE_IN_BYTES.getPreferredName(), maxOperationSizeInBytes);
builder.field(ShardFollowTask.MAX_WRITE_BUFFER_SIZE.getPreferredName(), maxWriteBufferSize);
builder.field(ShardFollowTask.MAX_CONCURRENT_READ_BATCHES.getPreferredName(), maxConcurrentReadBatches);
builder.field(ShardFollowTask.MAX_CONCURRENT_WRITE_BATCHES.getPreferredName(), maxConcurrentWriteBatches);
builder.field(ShardFollowTask.RETRY_TIMEOUT.getPreferredName(), retryTimeout.getStringRep());
builder.field(ShardFollowTask.IDLE_SHARD_RETRY_DELAY.getPreferredName(), idleShardRetryDelay.getStringRep());
}
builder.endObject();
return builder;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
@ -171,12 +264,12 @@ public class FollowIndexAction extends Action<FollowIndexAction.Response> {
Objects.equals(retryTimeout, request.retryTimeout) &&
Objects.equals(idleShardRetryDelay, request.idleShardRetryDelay) &&
Objects.equals(leaderIndex, request.leaderIndex) &&
Objects.equals(followIndex, request.followIndex);
Objects.equals(followerIndex, request.followerIndex);
}
@Override
public int hashCode() {
return Objects.hash(leaderIndex, followIndex, maxBatchOperationCount, maxConcurrentReadBatches, maxOperationSizeInBytes,
return Objects.hash(leaderIndex, followerIndex, maxBatchOperationCount, maxConcurrentReadBatches, maxOperationSizeInBytes,
maxConcurrentWriteBatches, maxWriteBufferSize, retryTimeout, idleShardRetryDelay);
}
}
@ -216,7 +309,7 @@ public class FollowIndexAction extends Action<FollowIndexAction.Response> {
@Override
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
ClusterState localClusterState = clusterService.state();
IndexMetaData followIndexMetadata = localClusterState.getMetaData().index(request.followIndex);
IndexMetaData followIndexMetadata = localClusterState.getMetaData().index(request.followerIndex);
String[] indices = new String[]{request.leaderIndex};
Map<String, List<String>> remoteClusterIndices = remoteClusterService.groupClusterIndices(indices, s -> false);
@ -378,7 +471,7 @@ public class FollowIndexAction extends Action<FollowIndexAction.Response> {
throw new IllegalArgumentException("leader index [" + request.leaderIndex + "] does not exist");
}
if (followIndex == null) {
throw new IllegalArgumentException("follow index [" + request.followIndex + "] does not exist");
throw new IllegalArgumentException("follow index [" + request.followerIndex + "] does not exist");
}
if (leaderIndex.getSettings().getAsBoolean(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), false) == false) {
throw new IllegalArgumentException("leader index [" + request.leaderIndex + "] does not have soft deletes enabled");

View File

@ -7,13 +7,11 @@ package org.elasticsearch.xpack.ccr.rest;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.xpack.ccr.action.ShardFollowNodeTask;
import org.elasticsearch.xpack.ccr.action.ShardFollowTask;
import java.io.IOException;
@ -38,32 +36,9 @@ public class RestFollowIndexAction extends BaseRestHandler {
return channel -> client.execute(INSTANCE, request, new RestToXContentListener<>(channel));
}
static Request createRequest(RestRequest restRequest) {
int maxBatchOperationCount = ShardFollowNodeTask.DEFAULT_MAX_BATCH_OPERATION_COUNT;
if (restRequest.hasParam(ShardFollowTask.MAX_BATCH_OPERATION_COUNT.getPreferredName())) {
maxBatchOperationCount = Integer.valueOf(restRequest.param(ShardFollowTask.MAX_BATCH_OPERATION_COUNT.getPreferredName()));
}
int maxConcurrentReadBatches = ShardFollowNodeTask.DEFAULT_MAX_CONCURRENT_READ_BATCHES;
if (restRequest.hasParam(ShardFollowTask.MAX_CONCURRENT_READ_BATCHES.getPreferredName())) {
maxConcurrentReadBatches = Integer.valueOf(restRequest.param(ShardFollowTask.MAX_CONCURRENT_READ_BATCHES.getPreferredName()));
}
long maxBatchSizeInBytes = ShardFollowNodeTask.DEFAULT_MAX_BATCH_SIZE_IN_BYTES;
if (restRequest.hasParam(ShardFollowTask.MAX_BATCH_SIZE_IN_BYTES.getPreferredName())) {
maxBatchSizeInBytes = Long.valueOf(restRequest.param(ShardFollowTask.MAX_BATCH_SIZE_IN_BYTES.getPreferredName()));
}
int maxConcurrentWriteBatches = ShardFollowNodeTask.DEFAULT_MAX_CONCURRENT_WRITE_BATCHES;
if (restRequest.hasParam(ShardFollowTask.MAX_CONCURRENT_WRITE_BATCHES.getPreferredName())) {
maxConcurrentWriteBatches = Integer.valueOf(restRequest.param(ShardFollowTask.MAX_CONCURRENT_WRITE_BATCHES.getPreferredName()));
}
int maxWriteBufferSize = ShardFollowNodeTask.DEFAULT_MAX_WRITE_BUFFER_SIZE;
if (restRequest.hasParam(ShardFollowTask.MAX_WRITE_BUFFER_SIZE.getPreferredName())) {
maxWriteBufferSize = Integer.parseInt(restRequest.param(ShardFollowTask.MAX_WRITE_BUFFER_SIZE.getPreferredName()));
}
TimeValue retryTimeout = restRequest.paramAsTime(ShardFollowTask.RETRY_TIMEOUT.getPreferredName(),
ShardFollowNodeTask.DEFAULT_RETRY_TIMEOUT);
TimeValue idleShardRetryTimeout = restRequest.paramAsTime(ShardFollowTask.IDLE_SHARD_RETRY_DELAY.getPreferredName(),
ShardFollowNodeTask.DEFAULT_IDLE_SHARD_RETRY_DELAY);
return new Request(restRequest.param("leader_index"), restRequest.param("index"), maxBatchOperationCount, maxConcurrentReadBatches,
maxBatchSizeInBytes, maxConcurrentWriteBatches, maxWriteBufferSize, retryTimeout, idleShardRetryTimeout);
static Request createRequest(RestRequest restRequest) throws IOException {
try (XContentParser parser = restRequest.contentOrSourceParamParser()) {
return Request.fromXContent(parser, restRequest.param("index"));
}
}
}

View File

@ -439,7 +439,7 @@ public class ShardChangesIT extends ESIntegTestCase {
client().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get();
}
final FollowIndexAction.Request followRequest = new FollowIndexAction.Request("index1", "index2", 1024, 1, 1024,
final FollowIndexAction.Request followRequest = new FollowIndexAction.Request("index1", "index2", 1024, 1, 1024L,
1, 10240, TimeValue.timeValueMillis(500), TimeValue.timeValueMillis(10));
final CreateAndFollowIndexAction.Request createAndFollowRequest = new CreateAndFollowIndexAction.Request(followRequest);
client().execute(CreateAndFollowIndexAction.INSTANCE, createAndFollowRequest).get();

View File

@ -6,9 +6,12 @@
package org.elasticsearch.xpack.ccr.action;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.AbstractStreamableTestCase;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractStreamableXContentTestCase;
public class FollowIndexRequestTests extends AbstractStreamableTestCase<FollowIndexAction.Request> {
import java.io.IOException;
public class FollowIndexRequestTests extends AbstractStreamableXContentTestCase<FollowIndexAction.Request> {
@Override
protected FollowIndexAction.Request createBlankInstance() {
@ -20,6 +23,16 @@ public class FollowIndexRequestTests extends AbstractStreamableTestCase<FollowIn
return createTestRequest();
}
@Override
protected FollowIndexAction.Request doParseInstance(XContentParser parser) throws IOException {
return FollowIndexAction.Request.fromXContent(parser, null);
}
@Override
protected boolean supportsUnknownFields() {
return false;
}
static FollowIndexAction.Request createTestRequest() {
return new FollowIndexAction.Request(randomAlphaOfLength(4), randomAlphaOfLength(4), randomIntBetween(1, Integer.MAX_VALUE),
randomIntBetween(1, Integer.MAX_VALUE), randomNonNegativeLong(), randomIntBetween(1, Integer.MAX_VALUE),

View File

@ -9,16 +9,13 @@
"index": {
"type": "string",
"required": true,
"description": "The name of the index that follows the leader index."
"description": "The name of the follower index"
}
}
},
"params": {
"leader_index": {
"type": "string",
"required": true,
"description": "The name of the index to read the changes from."
}
}
"body": {
"description" : "The name of the leader index and other optional ccr related parameters",
"required" : true
}
}
}

View File

@ -9,16 +9,13 @@
"index": {
"type": "string",
"required": true,
"description": "The name of the index that follows to leader index."
"description": "The name of the follower index."
}
}
},
"params": {
"leader_index": {
"type": "string",
"required": true,
"description": "The name of the index to read the changes from."
}
}
"body": {
"description" : "The name of the leader index and other optional ccr related parameters",
"required" : true
}
}
}

View File

@ -9,10 +9,8 @@
"index": {
"type": "string",
"required": true,
"description": "The name of the follow index that should stop following its leader index."
}
},
"params": {
"description": "The name of the follower index that should stop following its leader index."
}
}
}
}

View File

@ -17,8 +17,9 @@
- do:
xpack.ccr.create_and_follow_index:
leader_index: foo
index: bar
body:
leader_index: foo
- is_true: follow_index_created
- is_true: follow_index_shards_acked
- is_true: index_following_started
@ -30,8 +31,9 @@
- do:
xpack.ccr.follow_index:
leader_index: foo
index: bar
body:
leader_index: foo
- is_true: acknowledged
- do: