[CCR] Change FollowIndexAction.Request class to be more user friendly (#33810)

Instead of having one constructor that accepts all arguments, all parameters
should be provided via setters. Only leader and follower index are required
arguments. This makes using this class in tests and transport client easier.
This commit is contained in:
Martijn van Groningen 2018-09-19 07:18:24 +02:00 committed by GitHub
parent c6e3231ef3
commit 013b64a07c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 269 additions and 192 deletions

View File

@ -199,13 +199,13 @@ public class FollowIndexSecurityIT extends ESRestTestCase {
private static void followIndex(String leaderIndex, String followIndex) throws IOException {
final Request request = new Request("POST", "/" + followIndex + "/_ccr/follow");
request.setJsonEntity("{\"leader_index\": \"" + leaderIndex + "\", \"idle_shard_retry_delay\": \"10ms\"}");
request.setJsonEntity("{\"leader_index\": \"" + leaderIndex + "\", \"poll_timeout\": \"10ms\"}");
assertOK(client().performRequest(request));
}
private static void createAndFollowIndex(String leaderIndex, String followIndex) throws IOException {
final Request request = new Request("POST", "/" + followIndex + "/_ccr/create_and_follow");
request.setJsonEntity("{\"leader_index\": \"" + leaderIndex + "\", \"idle_shard_retry_delay\": \"10ms\"}");
request.setJsonEntity("{\"leader_index\": \"" + leaderIndex + "\", \"poll_timeout\": \"10ms\"}");
assertOK(client().performRequest(request));
}

View File

@ -141,13 +141,13 @@ public class FollowIndexIT extends ESRestTestCase {
private static void followIndex(String leaderIndex, String followIndex) throws IOException {
final Request request = new Request("POST", "/" + followIndex + "/_ccr/follow");
request.setJsonEntity("{\"leader_index\": \"" + leaderIndex + "\", \"idle_shard_retry_delay\": \"10ms\"}");
request.setJsonEntity("{\"leader_index\": \"" + leaderIndex + "\", \"poll_timeout\": \"10ms\"}");
assertOK(client().performRequest(request));
}
private static void createAndFollowIndex(String leaderIndex, String followIndex) throws IOException {
final Request request = new Request("POST", "/" + followIndex + "/_ccr/create_and_follow");
request.setJsonEntity("{\"leader_index\": \"" + leaderIndex + "\", \"idle_shard_retry_delay\": \"10ms\"}");
request.setJsonEntity("{\"leader_index\": \"" + leaderIndex + "\", \"poll_timeout\": \"10ms\"}");
assertOK(client().performRequest(request));
}

View File

@ -297,12 +297,16 @@ public class AutoFollowCoordinator implements ClusterStateApplier {
String leaderIndexNameWithClusterAliasPrefix = clusterAlias.equals("_local_") ? leaderIndexName :
clusterAlias + ":" + leaderIndexName;
FollowIndexAction.Request request =
new FollowIndexAction.Request(leaderIndexNameWithClusterAliasPrefix, followIndexName,
pattern.getMaxBatchOperationCount(), pattern.getMaxConcurrentReadBatches(),
pattern.getMaxOperationSizeInBytes(), pattern.getMaxConcurrentWriteBatches(),
pattern.getMaxWriteBufferSize(), pattern.getMaxRetryDelay(),
pattern.getIdleShardRetryDelay());
FollowIndexAction.Request request = new FollowIndexAction.Request();
request.setLeaderIndex(leaderIndexNameWithClusterAliasPrefix);
request.setFollowerIndex(followIndexName);
request.setMaxBatchOperationCount(pattern.getMaxBatchOperationCount());
request.setMaxConcurrentReadBatches(pattern.getMaxConcurrentReadBatches());
request.setMaxOperationSizeInBytes(pattern.getMaxOperationSizeInBytes());
request.setMaxConcurrentWriteBatches(pattern.getMaxConcurrentWriteBatches());
request.setMaxWriteBufferSize(pattern.getMaxWriteBufferSize());
request.setMaxRetryDelay(pattern.getMaxRetryDelay());
request.setPollTimeout(pattern.getIdleShardRetryDelay());
// Execute if the create and follow api call succeeds:
Runnable successHandler = () -> {

View File

@ -32,7 +32,6 @@ import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ccr.action.FollowIndexAction;
import java.io.IOException;
import java.util.ArrayList;
@ -64,8 +63,8 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
private int maxOperationCount;
private ShardId shardId;
private String expectedHistoryUUID;
private TimeValue pollTimeout = FollowIndexAction.DEFAULT_POLL_TIMEOUT;
private long maxOperationSizeInBytes = FollowIndexAction.DEFAULT_MAX_BATCH_SIZE_IN_BYTES;
private TimeValue pollTimeout = TransportFollowIndexAction.DEFAULT_POLL_TIMEOUT;
private long maxOperationSizeInBytes = TransportFollowIndexAction.DEFAULT_MAX_BATCH_SIZE_IN_BYTES;
public Request(ShardId shardId, String expectedHistoryUUID) {
super(shardId.getIndexName());

View File

@ -19,6 +19,7 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.IndexingSlowLog;
@ -55,6 +56,14 @@ import java.util.stream.Collectors;
public class TransportFollowIndexAction extends HandledTransportAction<FollowIndexAction.Request, AcknowledgedResponse> {
static final long DEFAULT_MAX_BATCH_SIZE_IN_BYTES = Long.MAX_VALUE;
private static final TimeValue DEFAULT_MAX_RETRY_DELAY = new TimeValue(500);
private static final int DEFAULT_MAX_CONCURRENT_WRITE_BATCHES = 1;
private static final int DEFAULT_MAX_WRITE_BUFFER_SIZE = 10240;
private static final int DEFAULT_MAX_BATCH_OPERATION_COUNT = 1024;
private static final int DEFAULT_MAX_CONCURRENT_READ_BATCHES = 1;
static final TimeValue DEFAULT_POLL_TIMEOUT = TimeValue.timeValueMinutes(1);
private final Client client;
private final ThreadPool threadPool;
private final ClusterService clusterService;
@ -179,19 +188,8 @@ public class TransportFollowIndexAction extends HandledTransportAction<FollowInd
String[] recordedLeaderShardHistoryUUIDs = extractIndexShardHistoryUUIDs(ccrIndexMetadata);
String recordedLeaderShardHistoryUUID = recordedLeaderShardHistoryUUIDs[shardId];
ShardFollowTask shardFollowTask = new ShardFollowTask(
clusterNameAlias,
new ShardId(followIndexMetadata.getIndex(), shardId),
new ShardId(leaderIndexMetadata.getIndex(), shardId),
request.getMaxBatchOperationCount(),
request.getMaxConcurrentReadBatches(),
request.getMaxOperationSizeInBytes(),
request.getMaxConcurrentWriteBatches(),
request.getMaxWriteBufferSize(),
request.getMaxRetryDelay(),
request.getPollTimeout(),
recordedLeaderShardHistoryUUID,
filteredHeaders);
final ShardFollowTask shardFollowTask = createShardFollowTask(shardId, clusterNameAlias, request,
leaderIndexMetadata, followIndexMetadata, recordedLeaderShardHistoryUUID, filteredHeaders);
persistentTasksService.sendStartRequest(taskId, ShardFollowTask.NAME, shardFollowTask,
new ActionListener<PersistentTasksCustomMetaData.PersistentTask<ShardFollowTask>>() {
@Override
@ -299,6 +297,69 @@ public class TransportFollowIndexAction extends HandledTransportAction<FollowInd
followerMapperService.merge(leaderIndex, MapperService.MergeReason.MAPPING_RECOVERY);
}
private static ShardFollowTask createShardFollowTask(
int shardId,
String clusterAliasName,
FollowIndexAction.Request request,
IndexMetaData leaderIndexMetadata,
IndexMetaData followIndexMetadata,
String recordedLeaderShardHistoryUUID,
Map<String, String> filteredHeaders
) {
int maxBatchOperationCount;
if (request.getMaxBatchOperationCount() != null) {
maxBatchOperationCount = request.getMaxBatchOperationCount();
} else {
maxBatchOperationCount = DEFAULT_MAX_BATCH_OPERATION_COUNT;
}
int maxConcurrentReadBatches;
if (request.getMaxConcurrentReadBatches() != null){
maxConcurrentReadBatches = request.getMaxConcurrentReadBatches();
} else {
maxConcurrentReadBatches = DEFAULT_MAX_CONCURRENT_READ_BATCHES;
}
long maxOperationSizeInBytes;
if (request.getMaxOperationSizeInBytes() != null) {
maxOperationSizeInBytes = request.getMaxOperationSizeInBytes();
} else {
maxOperationSizeInBytes = DEFAULT_MAX_BATCH_SIZE_IN_BYTES;
}
int maxConcurrentWriteBatches;
if (request.getMaxConcurrentWriteBatches() != null) {
maxConcurrentWriteBatches = request.getMaxConcurrentWriteBatches();
} else {
maxConcurrentWriteBatches = DEFAULT_MAX_CONCURRENT_WRITE_BATCHES;
}
int maxWriteBufferSize;
if (request.getMaxWriteBufferSize() != null) {
maxWriteBufferSize = request.getMaxWriteBufferSize();
} else {
maxWriteBufferSize = DEFAULT_MAX_WRITE_BUFFER_SIZE;
}
TimeValue maxRetryDelay = request.getMaxRetryDelay() == null ? DEFAULT_MAX_RETRY_DELAY : request.getMaxRetryDelay();
TimeValue pollTimeout = request.getPollTimeout() == null ? DEFAULT_POLL_TIMEOUT : request.getPollTimeout();
return new ShardFollowTask(
clusterAliasName,
new ShardId(followIndexMetadata.getIndex(), shardId),
new ShardId(leaderIndexMetadata.getIndex(), shardId),
maxBatchOperationCount,
maxConcurrentReadBatches,
maxOperationSizeInBytes,
maxConcurrentWriteBatches,
maxWriteBufferSize,
maxRetryDelay,
pollTimeout,
recordedLeaderShardHistoryUUID,
filteredHeaders
);
}
private static String[] extractIndexShardHistoryUUIDs(Map<String, String> ccrIndexMetaData) {
String historyUUIDs = ccrIndexMetaData.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_SHARD_HISTORY_UUIDS);
return historyUUIDs.split(",");

View File

@ -192,16 +192,12 @@ public class CcrLicenseIT extends ESSingleNodeTestCase {
}
private FollowIndexAction.Request getFollowRequest() {
return new FollowIndexAction.Request(
"leader",
"follower",
FollowIndexAction.DEFAULT_MAX_BATCH_OPERATION_COUNT,
FollowIndexAction.DEFAULT_MAX_CONCURRENT_READ_BATCHES,
FollowIndexAction.DEFAULT_MAX_BATCH_SIZE_IN_BYTES,
FollowIndexAction.DEFAULT_MAX_CONCURRENT_WRITE_BATCHES,
FollowIndexAction.DEFAULT_MAX_WRITE_BUFFER_SIZE,
TimeValue.timeValueMillis(10),
TimeValue.timeValueMillis(10));
FollowIndexAction.Request request = new FollowIndexAction.Request();
request.setLeaderIndex("leader");
request.setFollowerIndex("follower");
request.setMaxRetryDelay(TimeValue.timeValueMillis(10));
request.setPollTimeout(TimeValue.timeValueMillis(10));
return request;
}
}

View File

@ -319,9 +319,11 @@ public class ShardChangesIT extends ESIntegTestCase {
long numDocsIndexed = Math.min(3000 * 2, randomLongBetween(maxReadSize, maxReadSize * 10));
atLeastDocsIndexed("index1", numDocsIndexed / 3);
final FollowIndexAction.Request followRequest = new FollowIndexAction.Request("index1", "index2", maxReadSize,
randomIntBetween(2, 10), Long.MAX_VALUE, randomIntBetween(2, 10),
randomIntBetween(1024, 10240), TimeValue.timeValueMillis(500), TimeValue.timeValueMillis(10));
FollowIndexAction.Request followRequest = createFollowRequest("index1", "index2");
followRequest.setMaxBatchOperationCount(maxReadSize);
followRequest.setMaxConcurrentReadBatches(randomIntBetween(2, 10));
followRequest.setMaxConcurrentWriteBatches(randomIntBetween(2, 10));
followRequest.setMaxWriteBufferSize(randomIntBetween(1024, 10240));
CreateAndFollowIndexAction.Request createAndFollowRequest = new CreateAndFollowIndexAction.Request(followRequest);
client().execute(CreateAndFollowIndexAction.INSTANCE, createAndFollowRequest).get();
@ -358,9 +360,10 @@ public class ShardChangesIT extends ESIntegTestCase {
});
thread.start();
final FollowIndexAction.Request followRequest = new FollowIndexAction.Request("index1", "index2", randomIntBetween(32, 2048),
randomIntBetween(2, 10), Long.MAX_VALUE, randomIntBetween(2, 10),
FollowIndexAction.DEFAULT_MAX_WRITE_BUFFER_SIZE, TimeValue.timeValueMillis(500), TimeValue.timeValueMillis(10));
FollowIndexAction.Request followRequest = createFollowRequest("index1", "index2");
followRequest.setMaxBatchOperationCount(randomIntBetween(32, 2048));
followRequest.setMaxConcurrentReadBatches(randomIntBetween(2, 10));
followRequest.setMaxConcurrentWriteBatches(randomIntBetween(2, 10));
client().execute(CreateAndFollowIndexAction.INSTANCE, new CreateAndFollowIndexAction.Request(followRequest)).get();
long maxNumDocsReplicated = Math.min(1000, randomLongBetween(followRequest.getMaxBatchOperationCount(),
@ -447,7 +450,7 @@ public class ShardChangesIT extends ESIntegTestCase {
.actionGet());
}
public void testFollowIndex_lowMaxTranslogBytes() throws Exception {
public void testFollowIndexMaxOperationSizeInBytes() throws Exception {
final String leaderIndexSettings = getIndexSettings(1, between(0, 1),
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
@ -460,8 +463,8 @@ public class ShardChangesIT extends ESIntegTestCase {
client().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get();
}
final FollowIndexAction.Request followRequest = new FollowIndexAction.Request("index1", "index2", 1024, 1, 1024L,
1, 10240, TimeValue.timeValueMillis(500), TimeValue.timeValueMillis(10));
FollowIndexAction.Request followRequest = createFollowRequest("index1", "index2");
followRequest.setMaxOperationSizeInBytes(1L);
final CreateAndFollowIndexAction.Request createAndFollowRequest = new CreateAndFollowIndexAction.Request(followRequest);
client().execute(CreateAndFollowIndexAction.INSTANCE, createAndFollowRequest).get();
@ -489,25 +492,21 @@ public class ShardChangesIT extends ESIntegTestCase {
assertAcked(client().admin().indices().prepareCreate("index3").setSource(leaderIndexSettings, XContentType.JSON));
ensureGreen("index3");
FollowIndexAction.Request followRequest = new FollowIndexAction.Request("index1", "index2", 1024, 1, 1024L,
1, 10240, TimeValue.timeValueMillis(500), TimeValue.timeValueMillis(10));
FollowIndexAction.Request followRequest = createFollowRequest("index1", "index2");
CreateAndFollowIndexAction.Request createAndFollowRequest = new CreateAndFollowIndexAction.Request(followRequest);
client().execute(CreateAndFollowIndexAction.INSTANCE, createAndFollowRequest).get();
followRequest = new FollowIndexAction.Request("index3", "index4", 1024, 1, 1024L,
1, 10240, TimeValue.timeValueMillis(500), TimeValue.timeValueMillis(10));
followRequest = createFollowRequest("index3", "index4");
createAndFollowRequest = new CreateAndFollowIndexAction.Request(followRequest);
client().execute(CreateAndFollowIndexAction.INSTANCE, createAndFollowRequest).get();
unfollowIndex("index2", "index4");
FollowIndexAction.Request wrongRequest1 = new FollowIndexAction.Request("index1", "index4", 1024, 1, 1024L,
1, 10240, TimeValue.timeValueMillis(500), TimeValue.timeValueMillis(10));
FollowIndexAction.Request wrongRequest1 = createFollowRequest("index1", "index4");
Exception e = expectThrows(IllegalArgumentException.class,
() -> client().execute(FollowIndexAction.INSTANCE, wrongRequest1).actionGet());
assertThat(e.getMessage(), containsString("follow index [index4] should reference"));
FollowIndexAction.Request wrongRequest2 = new FollowIndexAction.Request("index3", "index2", 1024, 1, 1024L,
1, 10240, TimeValue.timeValueMillis(500), TimeValue.timeValueMillis(10));
FollowIndexAction.Request wrongRequest2 = createFollowRequest("index3", "index2");
e = expectThrows(IllegalArgumentException.class, () -> client().execute(FollowIndexAction.INSTANCE, wrongRequest2).actionGet());
assertThat(e.getMessage(), containsString("follow index [index2] should reference"));
}
@ -716,10 +715,12 @@ public class ShardChangesIT extends ESIntegTestCase {
}, 60, TimeUnit.SECONDS);
}
public static FollowIndexAction.Request createFollowRequest(String leaderIndex, String followIndex) {
return new FollowIndexAction.Request(leaderIndex, followIndex, FollowIndexAction.DEFAULT_MAX_BATCH_OPERATION_COUNT,
FollowIndexAction.DEFAULT_MAX_CONCURRENT_READ_BATCHES, FollowIndexAction.DEFAULT_MAX_BATCH_SIZE_IN_BYTES,
FollowIndexAction.DEFAULT_MAX_CONCURRENT_WRITE_BATCHES, FollowIndexAction.DEFAULT_MAX_WRITE_BUFFER_SIZE,
TimeValue.timeValueMillis(10), TimeValue.timeValueMillis(10));
public static FollowIndexAction.Request createFollowRequest(String leaderIndex, String followerIndex) {
FollowIndexAction.Request request = new FollowIndexAction.Request();
request.setLeaderIndex(leaderIndex);
request.setFollowerIndex(followerIndex);
request.setMaxRetryDelay(TimeValue.timeValueMillis(10));
request.setPollTimeout(TimeValue.timeValueMillis(10));
return request;
}
}

View File

@ -40,24 +40,49 @@ public class FollowIndexRequestTests extends AbstractStreamableXContentTestCase<
}
static FollowIndexAction.Request createTestRequest() {
return new FollowIndexAction.Request(randomAlphaOfLength(4), randomAlphaOfLength(4), randomIntBetween(1, Integer.MAX_VALUE),
randomIntBetween(1, Integer.MAX_VALUE), randomNonNegativeLong(), randomIntBetween(1, Integer.MAX_VALUE),
randomIntBetween(1, Integer.MAX_VALUE), TimeValue.timeValueMillis(500), TimeValue.timeValueMillis(500));
FollowIndexAction.Request request = new FollowIndexAction.Request();
request.setLeaderIndex(randomAlphaOfLength(4));
request.setFollowerIndex(randomAlphaOfLength(4));
if (randomBoolean()) {
request.setMaxBatchOperationCount(randomIntBetween(1, Integer.MAX_VALUE));
}
if (randomBoolean()) {
request.setMaxConcurrentReadBatches(randomIntBetween(1, Integer.MAX_VALUE));
}
if (randomBoolean()) {
request.setMaxConcurrentWriteBatches(randomIntBetween(1, Integer.MAX_VALUE));
}
if (randomBoolean()) {
request.setMaxOperationSizeInBytes(randomNonNegativeLong());
}
if (randomBoolean()) {
request.setMaxWriteBufferSize(randomIntBetween(1, Integer.MAX_VALUE));
}
if (randomBoolean()) {
request.setMaxRetryDelay(TimeValue.timeValueMillis(500));
}
if (randomBoolean()) {
request.setPollTimeout(TimeValue.timeValueMillis(500));
}
return request;
}
public void testValidate() {
FollowIndexAction.Request request = new FollowIndexAction.Request("index1", "index2", null, null, null, null,
null, TimeValue.ZERO, null);
FollowIndexAction.Request request = new FollowIndexAction.Request();
request.setLeaderIndex("index1");
request.setFollowerIndex("index2");
request.setMaxRetryDelay(TimeValue.ZERO);
ActionRequestValidationException validationException = request.validate();
assertThat(validationException, notNullValue());
assertThat(validationException.getMessage(), containsString("[max_retry_delay] must be positive but was [0ms]"));
request = new FollowIndexAction.Request("index1", "index2", null, null, null, null, null, TimeValue.timeValueMinutes(10), null);
request.setMaxRetryDelay(TimeValue.timeValueMinutes(10));
validationException = request.validate();
assertThat(validationException, notNullValue());
assertThat(validationException.getMessage(), containsString("[max_retry_delay] must be less than [5m] but was [10m]"));
request = new FollowIndexAction.Request("index1", "index2", null, null, null, null, null, TimeValue.timeValueMinutes(1), null);
request.setMaxRetryDelay(TimeValue.timeValueMinutes(1));
validationException = request.validate();
assertThat(validationException, nullValue());
}

View File

@ -15,7 +15,6 @@ import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse;
import org.elasticsearch.xpack.core.ccr.action.FollowIndexAction;
import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
import java.nio.charset.StandardCharsets;
@ -81,7 +80,7 @@ public class ShardFollowNodeTaskRandomTests extends ESTestCase {
new ShardId("leader_index", "", 0),
testRun.maxOperationCount,
concurrency,
FollowIndexAction.DEFAULT_MAX_BATCH_SIZE_IN_BYTES,
TransportFollowIndexAction.DEFAULT_MAX_BATCH_SIZE_IN_BYTES,
concurrency,
10240,
TimeValue.timeValueMillis(10),

View File

@ -14,7 +14,6 @@ import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -30,14 +29,7 @@ public final class FollowIndexAction extends Action<AcknowledgedResponse> {
public static final FollowIndexAction INSTANCE = new FollowIndexAction();
public static final String NAME = "cluster:admin/xpack/ccr/follow_index";
public static final int DEFAULT_MAX_WRITE_BUFFER_SIZE = 10240;
public static final int DEFAULT_MAX_BATCH_OPERATION_COUNT = 1024;
public static final int DEFAULT_MAX_CONCURRENT_READ_BATCHES = 1;
public static final int DEFAULT_MAX_CONCURRENT_WRITE_BATCHES = 1;
public static final long DEFAULT_MAX_BATCH_SIZE_IN_BYTES = Long.MAX_VALUE;
static final TimeValue DEFAULT_MAX_RETRY_DELAY = new TimeValue(500);
static final TimeValue MAX_RETRY_DELAY = TimeValue.timeValueMinutes(5);
public static final TimeValue DEFAULT_POLL_TIMEOUT = TimeValue.timeValueMinutes(1);
public static final TimeValue MAX_RETRY_DELAY = TimeValue.timeValueMinutes(5);
private FollowIndexAction() {
super(NAME);
@ -59,30 +51,23 @@ public final class FollowIndexAction extends Action<AcknowledgedResponse> {
private static final ParseField MAX_WRITE_BUFFER_SIZE = new ParseField("max_write_buffer_size");
private static final ParseField MAX_RETRY_DELAY_FIELD = new ParseField("max_retry_delay");
private static final ParseField POLL_TIMEOUT = new ParseField("poll_timeout");
private static final ConstructingObjectParser<Request, String> PARSER = new ConstructingObjectParser<>(NAME, true,
(args, followerIndex) -> {
if (args[1] != null) {
followerIndex = (String) args[1];
}
return new Request((String) args[0], followerIndex, (Integer) args[2], (Integer) args[3], (Long) args[4],
(Integer) args[5], (Integer) args[6], (TimeValue) args[7], (TimeValue) args[8]);
});
private static final ObjectParser<Request, String> PARSER = new ObjectParser<>(NAME, Request::new);
static {
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), LEADER_INDEX_FIELD);
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), FOLLOWER_INDEX_FIELD);
PARSER.declareInt(ConstructingObjectParser.optionalConstructorArg(), MAX_BATCH_OPERATION_COUNT);
PARSER.declareInt(ConstructingObjectParser.optionalConstructorArg(), MAX_CONCURRENT_READ_BATCHES);
PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), MAX_BATCH_SIZE_IN_BYTES);
PARSER.declareInt(ConstructingObjectParser.optionalConstructorArg(), MAX_CONCURRENT_WRITE_BATCHES);
PARSER.declareInt(ConstructingObjectParser.optionalConstructorArg(), MAX_WRITE_BUFFER_SIZE);
PARSER.declareString(Request::setLeaderIndex, LEADER_INDEX_FIELD);
PARSER.declareString(Request::setFollowerIndex, FOLLOWER_INDEX_FIELD);
PARSER.declareInt(Request::setMaxBatchOperationCount, MAX_BATCH_OPERATION_COUNT);
PARSER.declareInt(Request::setMaxConcurrentReadBatches, MAX_CONCURRENT_READ_BATCHES);
PARSER.declareLong(Request::setMaxOperationSizeInBytes, MAX_BATCH_SIZE_IN_BYTES);
PARSER.declareInt(Request::setMaxConcurrentWriteBatches, MAX_CONCURRENT_WRITE_BATCHES);
PARSER.declareInt(Request::setMaxWriteBufferSize, MAX_WRITE_BUFFER_SIZE);
PARSER.declareField(
ConstructingObjectParser.optionalConstructorArg(),
Request::setMaxRetryDelay,
(p, c) -> TimeValue.parseTimeValue(p.text(), MAX_RETRY_DELAY_FIELD.getPreferredName()),
MAX_RETRY_DELAY_FIELD,
ObjectParser.ValueType.STRING);
PARSER.declareField(
ConstructingObjectParser.optionalConstructorArg(),
Request::setPollTimeout,
(p, c) -> TimeValue.parseTimeValue(p.text(), POLL_TIMEOUT.getPreferredName()),
POLL_TIMEOUT,
ObjectParser.ValueType.STRING);
@ -108,6 +93,9 @@ public final class FollowIndexAction extends Action<AcknowledgedResponse> {
return leaderIndex;
}
public void setLeaderIndex(String leaderIndex) {
this.leaderIndex = leaderIndex;
}
private String followerIndex;
@ -115,38 +103,66 @@ public final class FollowIndexAction extends Action<AcknowledgedResponse> {
return followerIndex;
}
private int maxBatchOperationCount;
public void setFollowerIndex(String followerIndex) {
this.followerIndex = followerIndex;
}
public int getMaxBatchOperationCount() {
private Integer maxBatchOperationCount;
public Integer getMaxBatchOperationCount() {
return maxBatchOperationCount;
}
private int maxConcurrentReadBatches;
public void setMaxBatchOperationCount(Integer maxBatchOperationCount) {
this.maxBatchOperationCount = maxBatchOperationCount;
}
public int getMaxConcurrentReadBatches() {
private Integer maxConcurrentReadBatches;
public Integer getMaxConcurrentReadBatches() {
return maxConcurrentReadBatches;
}
private long maxOperationSizeInBytes;
public void setMaxConcurrentReadBatches(Integer maxConcurrentReadBatches) {
this.maxConcurrentReadBatches = maxConcurrentReadBatches;
}
public long getMaxOperationSizeInBytes() {
private Long maxOperationSizeInBytes;
public Long getMaxOperationSizeInBytes() {
return maxOperationSizeInBytes;
}
private int maxConcurrentWriteBatches;
public void setMaxOperationSizeInBytes(Long maxOperationSizeInBytes) {
this.maxOperationSizeInBytes = maxOperationSizeInBytes;
}
public int getMaxConcurrentWriteBatches() {
private Integer maxConcurrentWriteBatches;
public Integer getMaxConcurrentWriteBatches() {
return maxConcurrentWriteBatches;
}
private int maxWriteBufferSize;
public void setMaxConcurrentWriteBatches(Integer maxConcurrentWriteBatches) {
this.maxConcurrentWriteBatches = maxConcurrentWriteBatches;
}
public int getMaxWriteBufferSize() {
private Integer maxWriteBufferSize;
public Integer getMaxWriteBufferSize() {
return maxWriteBufferSize;
}
public void setMaxWriteBufferSize(Integer maxWriteBufferSize) {
this.maxWriteBufferSize = maxWriteBufferSize;
}
private TimeValue maxRetryDelay;
public void setMaxRetryDelay(TimeValue maxRetryDelay) {
this.maxRetryDelay = maxRetryDelay;
}
public TimeValue getMaxRetryDelay() {
return maxRetryDelay;
}
@ -157,88 +173,50 @@ public final class FollowIndexAction extends Action<AcknowledgedResponse> {
return pollTimeout;
}
public Request(
final String leaderIndex,
final String followerIndex,
final Integer maxBatchOperationCount,
final Integer maxConcurrentReadBatches,
final Long maxOperationSizeInBytes,
final Integer maxConcurrentWriteBatches,
final Integer maxWriteBufferSize,
final TimeValue maxRetryDelay,
final TimeValue pollTimeout) {
if (leaderIndex == null) {
throw new IllegalArgumentException(LEADER_INDEX_FIELD.getPreferredName() + " is missing");
}
if (followerIndex == null) {
throw new IllegalArgumentException(FOLLOWER_INDEX_FIELD.getPreferredName() + " is missing");
}
final int actualMaxBatchOperationCount =
maxBatchOperationCount == null ? DEFAULT_MAX_BATCH_OPERATION_COUNT : maxBatchOperationCount;
if (actualMaxBatchOperationCount < 1) {
throw new IllegalArgumentException(MAX_BATCH_OPERATION_COUNT.getPreferredName() + " must be larger than 0");
}
final int actualMaxConcurrentReadBatches =
maxConcurrentReadBatches == null ? DEFAULT_MAX_CONCURRENT_READ_BATCHES : maxConcurrentReadBatches;
if (actualMaxConcurrentReadBatches < 1) {
throw new IllegalArgumentException(MAX_CONCURRENT_READ_BATCHES.getPreferredName() + " must be larger than 0");
}
final long actualMaxOperationSizeInBytes =
maxOperationSizeInBytes == null ? DEFAULT_MAX_BATCH_SIZE_IN_BYTES : maxOperationSizeInBytes;
if (actualMaxOperationSizeInBytes <= 0) {
throw new IllegalArgumentException(MAX_BATCH_SIZE_IN_BYTES.getPreferredName() + " must be larger than 0");
}
final int actualMaxConcurrentWriteBatches =
maxConcurrentWriteBatches == null ? DEFAULT_MAX_CONCURRENT_WRITE_BATCHES : maxConcurrentWriteBatches;
if (actualMaxConcurrentWriteBatches < 1) {
throw new IllegalArgumentException(MAX_CONCURRENT_WRITE_BATCHES.getPreferredName() + " must be larger than 0");
}
final int actualMaxWriteBufferSize = maxWriteBufferSize == null ? DEFAULT_MAX_WRITE_BUFFER_SIZE : maxWriteBufferSize;
if (actualMaxWriteBufferSize < 1) {
throw new IllegalArgumentException(MAX_WRITE_BUFFER_SIZE.getPreferredName() + " must be larger than 0");
}
final TimeValue actualRetryTimeout = maxRetryDelay == null ? DEFAULT_MAX_RETRY_DELAY : maxRetryDelay;
final TimeValue actualPollTimeout = pollTimeout == null ? DEFAULT_POLL_TIMEOUT : pollTimeout;
this.leaderIndex = leaderIndex;
this.followerIndex = followerIndex;
this.maxBatchOperationCount = actualMaxBatchOperationCount;
this.maxConcurrentReadBatches = actualMaxConcurrentReadBatches;
this.maxOperationSizeInBytes = actualMaxOperationSizeInBytes;
this.maxConcurrentWriteBatches = actualMaxConcurrentWriteBatches;
this.maxWriteBufferSize = actualMaxWriteBufferSize;
this.maxRetryDelay = actualRetryTimeout;
this.pollTimeout = actualPollTimeout;
public void setPollTimeout(TimeValue pollTimeout) {
this.pollTimeout = pollTimeout;
}
public Request() {
}
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
ActionRequestValidationException e = null;
if (maxRetryDelay.millis() <= 0) {
if (leaderIndex == null) {
e = addValidationError(LEADER_INDEX_FIELD.getPreferredName() + " is missing", e);
}
if (followerIndex == null) {
e = addValidationError(FOLLOWER_INDEX_FIELD.getPreferredName() + " is missing", e);
}
if (maxBatchOperationCount != null && maxBatchOperationCount < 1) {
e = addValidationError(MAX_BATCH_OPERATION_COUNT.getPreferredName() + " must be larger than 0", e);
}
if (maxConcurrentReadBatches != null && maxConcurrentReadBatches < 1) {
e = addValidationError(MAX_CONCURRENT_READ_BATCHES.getPreferredName() + " must be larger than 0", e);
}
if (maxOperationSizeInBytes != null && maxOperationSizeInBytes <= 0) {
e = addValidationError(MAX_BATCH_SIZE_IN_BYTES.getPreferredName() + " must be larger than 0", e);
}
if (maxConcurrentWriteBatches != null && maxConcurrentWriteBatches < 1) {
e = addValidationError(MAX_CONCURRENT_WRITE_BATCHES.getPreferredName() + " must be larger than 0", e);
}
if (maxWriteBufferSize != null && maxWriteBufferSize < 1) {
e = addValidationError(MAX_WRITE_BUFFER_SIZE.getPreferredName() + " must be larger than 0", e);
}
if (maxRetryDelay != null && maxRetryDelay.millis() <= 0) {
String message = "[" + MAX_RETRY_DELAY_FIELD.getPreferredName() + "] must be positive but was [" +
maxRetryDelay.getStringRep() + "]";
validationException = addValidationError(message, validationException);
e = addValidationError(message, e);
}
if (maxRetryDelay.millis() > FollowIndexAction.MAX_RETRY_DELAY.millis()) {
if (maxRetryDelay != null && maxRetryDelay.millis() > FollowIndexAction.MAX_RETRY_DELAY.millis()) {
String message = "[" + MAX_RETRY_DELAY_FIELD.getPreferredName() + "] must be less than [" + MAX_RETRY_DELAY +
"] but was [" + maxRetryDelay.getStringRep() + "]";
validationException = addValidationError(message, validationException);
e = addValidationError(message, e);
}
return validationException;
return e;
}
@Override
@ -246,11 +224,11 @@ public final class FollowIndexAction extends Action<AcknowledgedResponse> {
super.readFrom(in);
leaderIndex = in.readString();
followerIndex = in.readString();
maxBatchOperationCount = in.readVInt();
maxConcurrentReadBatches = in.readVInt();
maxOperationSizeInBytes = in.readVLong();
maxConcurrentWriteBatches = in.readVInt();
maxWriteBufferSize = in.readVInt();
maxBatchOperationCount = in.readOptionalVInt();
maxConcurrentReadBatches = in.readOptionalVInt();
maxOperationSizeInBytes = in.readOptionalLong();
maxConcurrentWriteBatches = in.readOptionalVInt();
maxWriteBufferSize = in.readOptionalVInt();
maxRetryDelay = in.readOptionalTimeValue();
pollTimeout = in.readOptionalTimeValue();
}
@ -260,11 +238,11 @@ public final class FollowIndexAction extends Action<AcknowledgedResponse> {
super.writeTo(out);
out.writeString(leaderIndex);
out.writeString(followerIndex);
out.writeVInt(maxBatchOperationCount);
out.writeVInt(maxConcurrentReadBatches);
out.writeVLong(maxOperationSizeInBytes);
out.writeVInt(maxConcurrentWriteBatches);
out.writeVInt(maxWriteBufferSize);
out.writeOptionalVInt(maxBatchOperationCount);
out.writeOptionalVInt(maxConcurrentReadBatches);
out.writeOptionalLong(maxOperationSizeInBytes);
out.writeOptionalVInt(maxConcurrentWriteBatches);
out.writeOptionalVInt(maxWriteBufferSize);
out.writeOptionalTimeValue(maxRetryDelay);
out.writeOptionalTimeValue(pollTimeout);
}
@ -275,13 +253,27 @@ public final class FollowIndexAction extends Action<AcknowledgedResponse> {
{
builder.field(LEADER_INDEX_FIELD.getPreferredName(), leaderIndex);
builder.field(FOLLOWER_INDEX_FIELD.getPreferredName(), followerIndex);
builder.field(MAX_BATCH_OPERATION_COUNT.getPreferredName(), maxBatchOperationCount);
builder.field(MAX_BATCH_SIZE_IN_BYTES.getPreferredName(), maxOperationSizeInBytes);
builder.field(MAX_WRITE_BUFFER_SIZE.getPreferredName(), maxWriteBufferSize);
builder.field(MAX_CONCURRENT_READ_BATCHES.getPreferredName(), maxConcurrentReadBatches);
builder.field(MAX_CONCURRENT_WRITE_BATCHES.getPreferredName(), maxConcurrentWriteBatches);
builder.field(MAX_RETRY_DELAY_FIELD.getPreferredName(), maxRetryDelay.getStringRep());
builder.field(POLL_TIMEOUT.getPreferredName(), pollTimeout.getStringRep());
if (maxBatchOperationCount != null) {
builder.field(MAX_BATCH_OPERATION_COUNT.getPreferredName(), maxBatchOperationCount);
}
if (maxOperationSizeInBytes != null) {
builder.field(MAX_BATCH_SIZE_IN_BYTES.getPreferredName(), maxOperationSizeInBytes);
}
if (maxWriteBufferSize != null) {
builder.field(MAX_WRITE_BUFFER_SIZE.getPreferredName(), maxWriteBufferSize);
}
if (maxConcurrentReadBatches != null) {
builder.field(MAX_CONCURRENT_READ_BATCHES.getPreferredName(), maxConcurrentReadBatches);
}
if (maxConcurrentWriteBatches != null) {
builder.field(MAX_CONCURRENT_WRITE_BATCHES.getPreferredName(), maxConcurrentWriteBatches);
}
if (maxRetryDelay != null) {
builder.field(MAX_RETRY_DELAY_FIELD.getPreferredName(), maxRetryDelay.getStringRep());
}
if (pollTimeout != null) {
builder.field(POLL_TIMEOUT.getPreferredName(), pollTimeout.getStringRep());
}
}
builder.endObject();
return builder;
@ -292,11 +284,11 @@ public final class FollowIndexAction extends Action<AcknowledgedResponse> {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Request request = (Request) o;
return maxBatchOperationCount == request.maxBatchOperationCount &&
maxConcurrentReadBatches == request.maxConcurrentReadBatches &&
maxOperationSizeInBytes == request.maxOperationSizeInBytes &&
maxConcurrentWriteBatches == request.maxConcurrentWriteBatches &&
maxWriteBufferSize == request.maxWriteBufferSize &&
return Objects.equals(maxBatchOperationCount, request.maxBatchOperationCount) &&
Objects.equals(maxConcurrentReadBatches, request.maxConcurrentReadBatches) &&
Objects.equals(maxOperationSizeInBytes, request.maxOperationSizeInBytes) &&
Objects.equals(maxConcurrentWriteBatches, request.maxConcurrentWriteBatches) &&
Objects.equals(maxWriteBufferSize, request.maxWriteBufferSize) &&
Objects.equals(maxRetryDelay, request.maxRetryDelay) &&
Objects.equals(pollTimeout, request.pollTimeout) &&
Objects.equals(leaderIndex, request.leaderIndex) &&