Clean up duplicate follow config parameter code (#37688)

Introduced FollowParameters class that put follow, resume follow,
put auto follow pattern requests and follow info response classes reuse.

The FollowParameters class had the fields, getters etc. for the common parameters
that all these APIs have.  Also binary and xcontent serialization /
parsing is handled by this class.

The follow, resume follow, put auto follow pattern request classes originally
used optional non primitive fields, so FollowParameters has that too and the follow info api can handle that now too.

Also the followerIndex field can in production only be specified via
the url path. If it is also specified via the request body then
it must have the same value as is specified in the url path. This
option only existed to xcontent testing. However the AbstractSerializingTestCase
base class now also supports createXContextTestInstance() to provide
a different test instance when testing xcontent, so allowing followerIndex
to be specified via the request body is no longer needed.

By moving the followerIndex field from Body to ResumeFollowAction.Request
class and not allowing the followerIndex field to be specified via
the request body the Body class is redundant and can be removed. The
ResumeFollowAction.Request class can then directly use the
FollowParameters class.

For consistency I also removed the ability to specified followerIndex
in the put follow api and the name in put auto follow pattern api via
the request body.
This commit is contained in:
Martijn van Groningen 2019-02-05 17:05:19 +01:00 committed by GitHub
parent 2f6afd290e
commit 0beb3c93d1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 705 additions and 1016 deletions

View File

@ -32,7 +32,6 @@ public final class PutFollowRequest extends FollowConfig implements Validatable,
static final ParseField REMOTE_CLUSTER_FIELD = new ParseField("remote_cluster");
static final ParseField LEADER_INDEX_FIELD = new ParseField("leader_index");
static final ParseField FOLLOWER_INDEX_FIELD = new ParseField("follower_index");
private final String remoteCluster;
private final String leaderIndex;
@ -55,7 +54,6 @@ public final class PutFollowRequest extends FollowConfig implements Validatable,
builder.startObject();
builder.field(REMOTE_CLUSTER_FIELD.getPreferredName(), remoteCluster);
builder.field(LEADER_INDEX_FIELD.getPreferredName(), leaderIndex);
builder.field(FOLLOWER_INDEX_FIELD.getPreferredName(), followerIndex);
toXContentFragment(builder, params);
builder.endObject();
return builder;

View File

@ -26,8 +26,6 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.Objects;
import static org.elasticsearch.client.ccr.PutFollowRequest.FOLLOWER_INDEX_FIELD;
public final class ResumeFollowRequest extends FollowConfig implements Validatable, ToXContentObject {
private final String followerIndex;
@ -39,7 +37,6 @@ public final class ResumeFollowRequest extends FollowConfig implements Validatab
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(FOLLOWER_INDEX_FIELD.getPreferredName(), followerIndex);
toXContentFragment(builder, params);
builder.endObject();
return builder;

View File

@ -31,12 +31,11 @@ import java.io.IOException;
public class PutFollowRequestTests extends AbstractXContentTestCase<PutFollowRequest> {
private static final ConstructingObjectParser<PutFollowRequest, Void> PARSER = new ConstructingObjectParser<>("test_parser",
true, (args) -> new PutFollowRequest((String) args[0], (String) args[1], (String) args[2]));
true, (args) -> new PutFollowRequest((String) args[0], (String) args[1], "followerIndex"));
static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), PutFollowRequest.REMOTE_CLUSTER_FIELD);
PARSER.declareString(ConstructingObjectParser.constructorArg(), PutFollowRequest.LEADER_INDEX_FIELD);
PARSER.declareString(ConstructingObjectParser.constructorArg(), PutFollowRequest.FOLLOWER_INDEX_FIELD);
PARSER.declareInt(PutFollowRequest::setMaxReadRequestOperationCount, PutFollowRequest.MAX_READ_REQUEST_OPERATION_COUNT);
PARSER.declareField(
PutFollowRequest::setMaxReadRequestSize,
@ -82,7 +81,7 @@ public class PutFollowRequestTests extends AbstractXContentTestCase<PutFollowReq
@Override
protected PutFollowRequest createTestInstance() {
PutFollowRequest putFollowRequest =
new PutFollowRequest(randomAlphaOfLength(4), randomAlphaOfLength(4), randomAlphaOfLength(4));
new PutFollowRequest(randomAlphaOfLength(4), randomAlphaOfLength(4), "followerIndex");
if (randomBoolean()) {
putFollowRequest.setMaxOutstandingReadRequests(randomIntBetween(0, Integer.MAX_VALUE));
}

View File

@ -21,7 +21,6 @@ package org.elasticsearch.client.ccr;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractXContentTestCase;
@ -30,11 +29,10 @@ import java.io.IOException;
public class ResumeFollowRequestTests extends AbstractXContentTestCase<ResumeFollowRequest> {
private static final ConstructingObjectParser<ResumeFollowRequest, Void> PARSER = new ConstructingObjectParser<>("test_parser",
true, (args) -> new ResumeFollowRequest((String) args[0]));
private static final ObjectParser<ResumeFollowRequest, Void> PARSER = new ObjectParser<>("test_parser",
true, () -> new ResumeFollowRequest("followerIndex"));
static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), PutFollowRequest.FOLLOWER_INDEX_FIELD);
PARSER.declareInt(ResumeFollowRequest::setMaxReadRequestOperationCount, FollowConfig.MAX_READ_REQUEST_OPERATION_COUNT);
PARSER.declareField(
ResumeFollowRequest::setMaxReadRequestSize,
@ -79,7 +77,7 @@ public class ResumeFollowRequestTests extends AbstractXContentTestCase<ResumeFol
@Override
protected ResumeFollowRequest createTestInstance() {
ResumeFollowRequest resumeFollowRequest = new ResumeFollowRequest(randomAlphaOfLength(4));
ResumeFollowRequest resumeFollowRequest = new ResumeFollowRequest("followerIndex");
if (randomBoolean()) {
resumeFollowRequest.setMaxOutstandingReadRequests(randomIntBetween(0, Integer.MAX_VALUE));
}

View File

@ -41,7 +41,6 @@ import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata.AutoFollowPattern;
import org.elasticsearch.xpack.core.ccr.AutoFollowStats;
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction;
import java.util.ArrayList;
import java.util.Collections;
@ -514,23 +513,20 @@ public class AutoFollowCoordinator implements ClusterStateListener {
final String leaderIndexName = indexToFollow.getName();
final String followIndexName = getFollowerIndexName(pattern, leaderIndexName);
ResumeFollowAction.Request followRequest = new ResumeFollowAction.Request();
followRequest.setFollowerIndex(followIndexName);
followRequest.setMaxReadRequestOperationCount(pattern.getMaxReadRequestOperationCount());
followRequest.setMaxReadRequestSize(pattern.getMaxReadRequestSize());
followRequest.setMaxOutstandingReadRequests(pattern.getMaxOutstandingReadRequests());
followRequest.setMaxWriteRequestOperationCount(pattern.getMaxWriteRequestOperationCount());
followRequest.setMaxWriteRequestSize(pattern.getMaxWriteRequestSize());
followRequest.setMaxOutstandingWriteRequests(pattern.getMaxOutstandingWriteRequests());
followRequest.setMaxWriteBufferCount(pattern.getMaxWriteBufferCount());
followRequest.setMaxWriteBufferSize(pattern.getMaxWriteBufferSize());
followRequest.setMaxRetryDelay(pattern.getMaxRetryDelay());
followRequest.setReadPollTimeout(pattern.getPollTimeout());
PutFollowAction.Request request = new PutFollowAction.Request();
request.setRemoteCluster(remoteCluster);
request.setLeaderIndex(indexToFollow.getName());
request.setFollowRequest(followRequest);
request.setFollowerIndex(followIndexName);
request.getParameters().setMaxReadRequestOperationCount(pattern.getMaxReadRequestOperationCount());
request.getParameters().setMaxReadRequestSize(pattern.getMaxReadRequestSize());
request.getParameters().setMaxOutstandingReadRequests(pattern.getMaxOutstandingReadRequests());
request.getParameters().setMaxWriteRequestOperationCount(pattern.getMaxWriteRequestOperationCount());
request.getParameters().setMaxWriteRequestSize(pattern.getMaxWriteRequestSize());
request.getParameters().setMaxOutstandingWriteRequests(pattern.getMaxOutstandingWriteRequests());
request.getParameters().setMaxWriteBufferCount(pattern.getMaxWriteBufferCount());
request.getParameters().setMaxWriteBufferSize(pattern.getMaxWriteBufferSize());
request.getParameters().setMaxRetryDelay(pattern.getMaxRetryDelay());
request.getParameters().setReadPollTimeout(pattern.getPollTimeout());
// Execute if the create and follow api call succeeds:
Runnable successHandler = () -> {

View File

@ -22,7 +22,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ccr.Ccr;
import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction;
import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction.Response.FollowParameters;
import org.elasticsearch.xpack.core.ccr.action.FollowParameters;
import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction.Response.FollowerInfo;
import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction.Response.Status;
@ -97,18 +97,17 @@ public class TransportFollowInfoAction extends TransportMasterNodeReadAction<Fol
String leaderIndex = ccrCustomData.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_NAME_KEY);
if (result.isPresent()) {
ShardFollowTask params = result.get();
FollowParameters followParameters = new FollowParameters(
params.getMaxReadRequestOperationCount(),
params.getMaxReadRequestSize(),
params.getMaxOutstandingReadRequests(),
params.getMaxWriteRequestOperationCount(),
params.getMaxWriteRequestSize(),
params.getMaxOutstandingWriteRequests(),
params.getMaxWriteBufferCount(),
params.getMaxWriteBufferSize(),
params.getMaxRetryDelay(),
params.getReadPollTimeout()
);
FollowParameters followParameters = new FollowParameters();
followParameters.setMaxOutstandingReadRequests(params.getMaxOutstandingReadRequests());
followParameters.setMaxOutstandingWriteRequests(params.getMaxOutstandingWriteRequests());
followParameters.setMaxReadRequestOperationCount(params.getMaxReadRequestOperationCount());
followParameters.setMaxWriteRequestOperationCount(params.getMaxWriteRequestOperationCount());
followParameters.setMaxReadRequestSize(params.getMaxReadRequestSize());
followParameters.setMaxWriteRequestSize(params.getMaxWriteRequestSize());
followParameters.setMaxWriteBufferCount(params.getMaxWriteBufferCount());
followParameters.setMaxWriteBufferSize(params.getMaxWriteBufferSize());
followParameters.setMaxRetryDelay(params.getMaxRetryDelay());
followParameters.setReadPollTimeout(params.getReadPollTimeout());
followerInfos.add(new FollowerInfo(followerIndex, remoteCluster, leaderIndex, Status.ACTIVE, followParameters));
} else {
followerInfos.add(new FollowerInfo(followerIndex, remoteCluster, leaderIndex, Status.PAUSED, null));

View File

@ -147,8 +147,7 @@ public class TransportPutAutoFollowPatternAction extends
markExistingIndicesAsAutoFollowedForNewPatterns(request.getLeaderIndexPatterns(), remoteClusterState.metaData(),
previousPattern, followedIndexUUIDs);
} else {
markExistingIndicesAsAutoFollowed(request.getLeaderIndexPatterns(), remoteClusterState.metaData(),
followedIndexUUIDs);
markExistingIndicesAsAutoFollowed(request.getLeaderIndexPatterns(), remoteClusterState.metaData(), followedIndexUUIDs);
}
if (filteredHeaders != null) {
@ -159,16 +158,16 @@ public class TransportPutAutoFollowPatternAction extends
request.getRemoteCluster(),
request.getLeaderIndexPatterns(),
request.getFollowIndexNamePattern(),
request.getMaxReadRequestOperationCount(),
request.getMaxReadRequestSize(),
request.getMaxConcurrentReadBatches(),
request.getMaxWriteRequestOperationCount(),
request.getMaxWriteRequestSize(),
request.getMaxConcurrentWriteBatches(),
request.getMaxWriteBufferCount(),
request.getMaxWriteBufferSize(),
request.getMaxRetryDelay(),
request.getReadPollTimeout());
request.getParameters().getMaxReadRequestOperationCount(),
request.getParameters().getMaxReadRequestSize(),
request.getParameters().getMaxOutstandingReadRequests(),
request.getParameters().getMaxWriteRequestOperationCount(),
request.getParameters().getMaxWriteRequestSize(),
request.getParameters().getMaxOutstandingWriteRequests(),
request.getParameters().getMaxWriteBufferCount(),
request.getParameters().getMaxWriteBufferSize(),
request.getParameters().getMaxRetryDelay(),
request.getParameters().getReadPollTimeout());
patterns.put(request.getName(), autoFollowPattern);
ClusterState.Builder newState = ClusterState.builder(localState);
newState.metaData(MetaData.builder(localState.getMetaData())

View File

@ -38,6 +38,7 @@ import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ccr.CcrLicenseChecker;
import org.elasticsearch.xpack.ccr.CcrSettings;
import org.elasticsearch.xpack.ccr.repository.CcrRepository;
import org.elasticsearch.xpack.core.ccr.action.FollowParameters;
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction;
@ -126,18 +127,18 @@ public final class TransportPutFollowAction
// soft deletes are enabled by default on indices created on 7.0.0 or later
if (leaderIndexMetaData.getSettings().getAsBoolean(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(),
IndexMetaData.SETTING_INDEX_VERSION_CREATED.get(leaderIndexMetaData.getSettings()).onOrAfter(Version.V_7_0_0)) == false) {
listener.onFailure(
new IllegalArgumentException("leader index [" + request.getLeaderIndex() + "] does not have soft deletes enabled"));
listener.onFailure(new IllegalArgumentException("leader index [" + request.getLeaderIndex() +
"] does not have soft deletes enabled"));
return;
}
final Settings.Builder settingsBuilder = Settings.builder()
.put(IndexMetaData.SETTING_INDEX_PROVIDED_NAME, request.getFollowRequest().getFollowerIndex())
.put(IndexMetaData.SETTING_INDEX_PROVIDED_NAME, request.getFollowerIndex())
.put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true);
final String leaderClusterRepoName = CcrRepository.NAME_PREFIX + request.getRemoteCluster();
final RestoreSnapshotRequest restoreRequest = new RestoreSnapshotRequest(leaderClusterRepoName, CcrRepository.LATEST)
.indices(request.getLeaderIndex()).indicesOptions(request.indicesOptions()).renamePattern("^(.*)$")
.renameReplacement(request.getFollowRequest().getFollowerIndex()).masterNodeTimeout(request.masterNodeTimeout())
.renameReplacement(request.getFollowerIndex()).masterNodeTimeout(request.masterNodeTimeout())
.indexSettings(settingsBuilder);
final Client clientWithHeaders = CcrLicenseChecker.wrapClient(this.client, threadPool.getThreadContext().getHeaders());
@ -217,10 +218,14 @@ public final class TransportPutFollowAction
final PutFollowAction.Request request,
final ActionListener<PutFollowAction.Response> listener) {
assert request.waitForActiveShards() != ActiveShardCount.DEFAULT : "PutFollowAction does not support DEFAULT.";
activeShardsObserver.waitForActiveShards(new String[]{request.getFollowRequest().getFollowerIndex()},
activeShardsObserver.waitForActiveShards(new String[]{request.getFollowerIndex()},
request.waitForActiveShards(), request.timeout(), result -> {
if (result) {
client.execute(ResumeFollowAction.INSTANCE, request.getFollowRequest(), ActionListener.wrap(
FollowParameters parameters = request.getParameters();
ResumeFollowAction.Request resumeFollowRequest = new ResumeFollowAction.Request();
resumeFollowRequest.setFollowerIndex(request.getFollowerIndex());
resumeFollowRequest.setParameters(new FollowParameters(parameters));
client.execute(ResumeFollowAction.INSTANCE, resumeFollowRequest, ActionListener.wrap(
r -> listener.onResponse(new PutFollowAction.Response(true, true, r.isAcknowledged())),
listener::onFailure
));
@ -232,6 +237,6 @@ public final class TransportPutFollowAction
@Override
protected ClusterBlockException checkBlock(final PutFollowAction.Request request, final ClusterState state) {
return state.blocks().indexBlockedException(ClusterBlockLevel.METADATA_WRITE, request.getFollowRequest().getFollowerIndex());
return state.blocks().indexBlockedException(ClusterBlockLevel.METADATA_WRITE, request.getFollowerIndex());
}
}

View File

@ -16,8 +16,8 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider;
@ -47,6 +47,7 @@ import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ccr.Ccr;
import org.elasticsearch.xpack.ccr.CcrLicenseChecker;
import org.elasticsearch.xpack.ccr.CcrSettings;
import org.elasticsearch.xpack.core.ccr.action.FollowParameters;
import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction;
import java.io.IOException;
@ -177,8 +178,7 @@ public class TransportResumeFollowAction extends TransportMasterNodeAction<Resum
for (int shardId = 0; shardId < numShards; shardId++) {
String taskId = followIndexMetadata.getIndexUUID() + "-" + shardId;
final ShardFollowTask shardFollowTask = createShardFollowTask(shardId, clusterNameAlias, request,
final ShardFollowTask shardFollowTask = createShardFollowTask(shardId, clusterNameAlias, request.getParameters(),
leaderIndexMetadata, followIndexMetadata, filteredHeaders);
persistentTasksService.sendStartRequest(taskId, ShardFollowTask.NAME, shardFollowTask, handler.getActionListener(shardId));
}
@ -190,6 +190,8 @@ public class TransportResumeFollowAction extends TransportMasterNodeAction<Resum
final IndexMetaData followIndex,
final String[] leaderIndexHistoryUUID,
final MapperService followerMapperService) {
FollowParameters parameters = request.getParameters();
Map<String, String> ccrIndexMetadata = followIndex.getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY);
if (ccrIndexMetadata == null) {
throw new IllegalArgumentException("follow index ["+ followIndex.getIndex().getName() + "] does not have ccr metadata");
@ -197,8 +199,8 @@ public class TransportResumeFollowAction extends TransportMasterNodeAction<Resum
String leaderIndexUUID = leaderIndex.getIndex().getUUID();
String recordedLeaderIndexUUID = ccrIndexMetadata.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY);
if (leaderIndexUUID.equals(recordedLeaderIndexUUID) == false) {
throw new IllegalArgumentException("follow index [" + request.getFollowerIndex() + "] should reference [" + leaderIndexUUID +
"] as leader index but instead reference [" + recordedLeaderIndexUUID + "] as leader index");
throw new IllegalArgumentException("follow index [" + request.getFollowerIndex() + "] should reference [" +
leaderIndexUUID + "] as leader index but instead reference [" + recordedLeaderIndexUUID + "] as leader index");
}
String[] recordedHistoryUUIDs = extractLeaderShardHistoryUUIDs(ccrIndexMetadata);
@ -219,7 +221,8 @@ public class TransportResumeFollowAction extends TransportMasterNodeAction<Resum
"] does not have soft deletes enabled");
}
if (IndexSettings.INDEX_SOFT_DELETES_SETTING.get(followIndex.getSettings()) == false) {
throw new IllegalArgumentException("follower index [" + request.getFollowerIndex() + "] does not have soft deletes enabled");
throw new IllegalArgumentException("follower index [" + request.getFollowerIndex() +
"] does not have soft deletes enabled");
}
if (leaderIndex.getNumberOfShards() != followIndex.getNumberOfShards()) {
throw new IllegalArgumentException("leader index primary shards [" + leaderIndex.getNumberOfShards() +
@ -251,69 +254,69 @@ public class TransportResumeFollowAction extends TransportMasterNodeAction<Resum
private static ShardFollowTask createShardFollowTask(
int shardId,
String clusterAliasName,
ResumeFollowAction.Request request,
FollowParameters parameters,
IndexMetaData leaderIndexMetadata,
IndexMetaData followIndexMetadata,
Map<String, String> filteredHeaders
) {
int maxReadRequestOperationCount;
if (request.getMaxReadRequestOperationCount() != null) {
maxReadRequestOperationCount = request.getMaxReadRequestOperationCount();
if (parameters.getMaxReadRequestOperationCount() != null) {
maxReadRequestOperationCount = parameters.getMaxReadRequestOperationCount();
} else {
maxReadRequestOperationCount = DEFAULT_MAX_READ_REQUEST_OPERATION_COUNT;
}
ByteSizeValue maxReadRequestSize;
if (request.getMaxReadRequestSize() != null) {
maxReadRequestSize = request.getMaxReadRequestSize();
if (parameters.getMaxReadRequestSize() != null) {
maxReadRequestSize = parameters.getMaxReadRequestSize();
} else {
maxReadRequestSize = DEFAULT_MAX_READ_REQUEST_SIZE;
}
int maxOutstandingReadRequests;
if (request.getMaxOutstandingReadRequests() != null){
maxOutstandingReadRequests = request.getMaxOutstandingReadRequests();
if (parameters.getMaxOutstandingReadRequests() != null){
maxOutstandingReadRequests = parameters.getMaxOutstandingReadRequests();
} else {
maxOutstandingReadRequests = DEFAULT_MAX_OUTSTANDING_READ_REQUESTS;
}
final int maxWriteRequestOperationCount;
if (request.getMaxWriteRequestOperationCount() != null) {
maxWriteRequestOperationCount = request.getMaxWriteRequestOperationCount();
if (parameters.getMaxWriteRequestOperationCount() != null) {
maxWriteRequestOperationCount = parameters.getMaxWriteRequestOperationCount();
} else {
maxWriteRequestOperationCount = DEFAULT_MAX_WRITE_REQUEST_OPERATION_COUNT;
}
final ByteSizeValue maxWriteRequestSize;
if (request.getMaxWriteRequestSize() != null) {
maxWriteRequestSize = request.getMaxWriteRequestSize();
if (parameters.getMaxWriteRequestSize() != null) {
maxWriteRequestSize = parameters.getMaxWriteRequestSize();
} else {
maxWriteRequestSize = DEFAULT_MAX_WRITE_REQUEST_SIZE;
}
int maxOutstandingWriteRequests;
if (request.getMaxOutstandingWriteRequests() != null) {
maxOutstandingWriteRequests = request.getMaxOutstandingWriteRequests();
if (parameters.getMaxOutstandingWriteRequests() != null) {
maxOutstandingWriteRequests = parameters.getMaxOutstandingWriteRequests();
} else {
maxOutstandingWriteRequests = DEFAULT_MAX_OUTSTANDING_WRITE_REQUESTS;
}
int maxWriteBufferCount;
if (request.getMaxWriteBufferCount() != null) {
maxWriteBufferCount = request.getMaxWriteBufferCount();
if (parameters.getMaxWriteBufferCount() != null) {
maxWriteBufferCount = parameters.getMaxWriteBufferCount();
} else {
maxWriteBufferCount = DEFAULT_MAX_WRITE_BUFFER_COUNT;
}
ByteSizeValue maxWriteBufferSize;
if (request.getMaxWriteBufferSize() != null) {
maxWriteBufferSize = request.getMaxWriteBufferSize();
if (parameters.getMaxWriteBufferSize() != null) {
maxWriteBufferSize = parameters.getMaxWriteBufferSize();
} else {
maxWriteBufferSize = DEFAULT_MAX_WRITE_BUFFER_SIZE;
}
TimeValue maxRetryDelay = request.getMaxRetryDelay() == null ? DEFAULT_MAX_RETRY_DELAY : request.getMaxRetryDelay();
TimeValue readPollTimeout = request.getReadPollTimeout() == null ? DEFAULT_READ_POLL_TIMEOUT : request.getReadPollTimeout();
TimeValue maxRetryDelay = parameters.getMaxRetryDelay() == null ? DEFAULT_MAX_RETRY_DELAY : parameters.getMaxRetryDelay();
TimeValue readPollTimeout = parameters.getReadPollTimeout() == null ? DEFAULT_READ_POLL_TIMEOUT : parameters.getReadPollTimeout();
return new ShardFollowTask(
clusterAliasName,

View File

@ -426,7 +426,9 @@ public abstract class CcrIntegTestCase extends ESTestCase {
PutFollowAction.Request request = new PutFollowAction.Request();
request.setRemoteCluster("leader_cluster");
request.setLeaderIndex(leaderIndex);
request.setFollowRequest(resumeFollow(followerIndex));
request.setFollowerIndex(followerIndex);
request.getParameters().setMaxRetryDelay(TimeValue.timeValueMillis(10));
request.getParameters().setReadPollTimeout(TimeValue.timeValueMillis(10));
request.waitForActiveShards(waitForActiveShards);
return request;
}
@ -434,8 +436,8 @@ public abstract class CcrIntegTestCase extends ESTestCase {
public static ResumeFollowAction.Request resumeFollow(String followerIndex) {
ResumeFollowAction.Request request = new ResumeFollowAction.Request();
request.setFollowerIndex(followerIndex);
request.setMaxRetryDelay(TimeValue.timeValueMillis(10));
request.setReadPollTimeout(TimeValue.timeValueMillis(10));
request.getParameters().setMaxRetryDelay(TimeValue.timeValueMillis(10));
request.getParameters().setReadPollTimeout(TimeValue.timeValueMillis(10));
return request;
}

View File

@ -89,8 +89,8 @@ public abstract class CcrSingleNodeTestCase extends ESSingleNodeTestCase {
protected ResumeFollowAction.Request getResumeFollowRequest(String followerIndex) {
ResumeFollowAction.Request request = new ResumeFollowAction.Request();
request.setFollowerIndex(followerIndex);
request.setMaxRetryDelay(TimeValue.timeValueMillis(1));
request.setReadPollTimeout(TimeValue.timeValueMillis(1));
request.getParameters().setMaxRetryDelay(TimeValue.timeValueMillis(1));
request.getParameters().setReadPollTimeout(TimeValue.timeValueMillis(1));
return request;
}
@ -98,7 +98,9 @@ public abstract class CcrSingleNodeTestCase extends ESSingleNodeTestCase {
PutFollowAction.Request request = new PutFollowAction.Request();
request.setRemoteCluster("local");
request.setLeaderIndex(leaderIndex);
request.setFollowRequest(getResumeFollowRequest(followerIndex));
request.setFollowerIndex(followerIndex);
request.getParameters().setMaxRetryDelay(TimeValue.timeValueMillis(1));
request.getParameters().setReadPollTimeout(TimeValue.timeValueMillis(1));
request.waitForActiveShards(ActiveShardCount.ONE);
return request;
}

View File

@ -24,7 +24,7 @@ import org.elasticsearch.xpack.core.ccr.AutoFollowStats;
import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction;
import org.elasticsearch.xpack.core.ccr.action.DeleteAutoFollowPatternAction;
import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction;
import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction.Response.FollowParameters;
import org.elasticsearch.xpack.core.ccr.action.FollowParameters;
import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction.Response.FollowerInfo;
import org.elasticsearch.xpack.core.ccr.action.PutAutoFollowPatternAction;
@ -186,41 +186,42 @@ public class AutoFollowIT extends CcrIntegTestCase {
// Enabling auto following:
PutAutoFollowPatternAction.Request request = new PutAutoFollowPatternAction.Request();
request.setName("my-pattern");
request.setRemoteCluster("leader_cluster");
request.setLeaderIndexPatterns(Collections.singletonList("logs-*"));
// Need to set this, because following an index in the same cluster
request.setFollowIndexNamePattern("copy-{{leader_index}}");
if (randomBoolean()) {
request.setMaxWriteBufferCount(randomIntBetween(0, Integer.MAX_VALUE));
request.getParameters().setMaxWriteBufferCount(randomIntBetween(0, Integer.MAX_VALUE));
}
if (randomBoolean()) {
request.setMaxConcurrentReadBatches(randomIntBetween(0, Integer.MAX_VALUE));
request.getParameters().setMaxOutstandingReadRequests(randomIntBetween(0, Integer.MAX_VALUE));
}
if (randomBoolean()) {
request.setMaxConcurrentWriteBatches(randomIntBetween(0, Integer.MAX_VALUE));
request.getParameters().setMaxOutstandingWriteRequests(randomIntBetween(0, Integer.MAX_VALUE));
}
if (randomBoolean()) {
request.setMaxReadRequestOperationCount(randomIntBetween(0, Integer.MAX_VALUE));
request.getParameters().setMaxReadRequestOperationCount(randomIntBetween(0, Integer.MAX_VALUE));
}
if (randomBoolean()) {
request.setMaxReadRequestSize(new ByteSizeValue(randomNonNegativeLong(), ByteSizeUnit.BYTES));
request.getParameters().setMaxReadRequestSize(new ByteSizeValue(randomNonNegativeLong(), ByteSizeUnit.BYTES));
}
if (randomBoolean()) {
request.setMaxRetryDelay(TimeValue.timeValueMillis(500));
request.getParameters().setMaxRetryDelay(TimeValue.timeValueMillis(500));
}
if (randomBoolean()) {
request.setReadPollTimeout(TimeValue.timeValueMillis(500));
request.getParameters().setReadPollTimeout(TimeValue.timeValueMillis(500));
}
if (randomBoolean()) {
request.setMaxWriteRequestOperationCount(randomIntBetween(0, Integer.MAX_VALUE));
request.getParameters().setMaxWriteRequestOperationCount(randomIntBetween(0, Integer.MAX_VALUE));
}
if (randomBoolean()) {
request.setMaxWriteBufferSize(new ByteSizeValue(randomNonNegativeLong(), ByteSizeUnit.BYTES));
request.getParameters().setMaxWriteBufferSize(new ByteSizeValue(randomNonNegativeLong(), ByteSizeUnit.BYTES));
}
if (randomBoolean()) {
request.setMaxWriteRequestSize(new ByteSizeValue(randomNonNegativeLong()));
request.getParameters().setMaxWriteRequestSize(new ByteSizeValue(randomNonNegativeLong()));
}
request.setName("my-pattern");
assertTrue(followerClient().execute(PutAutoFollowPatternAction.INSTANCE, request).actionGet().isAcknowledged());
createLeaderIndex("logs-201901", leaderIndexSettings);
@ -242,35 +243,39 @@ public class AutoFollowIT extends CcrIntegTestCase {
FollowParameters followParameters = followerInfo.getParameters();
assertThat(followParameters, notNullValue());
if (request.getMaxWriteBufferCount() != null) {
assertThat(followParameters.getMaxWriteBufferCount(), equalTo(request.getMaxWriteBufferCount()));
if (request.getParameters().getMaxWriteBufferCount() != null) {
assertThat(followParameters.getMaxWriteBufferCount(), equalTo(request.getParameters().getMaxWriteBufferCount()));
}
if (request.getMaxWriteBufferSize() != null) {
assertThat(followParameters.getMaxWriteBufferSize(), equalTo(request.getMaxWriteBufferSize()));
if (request.getParameters().getMaxWriteBufferSize() != null) {
assertThat(followParameters.getMaxWriteBufferSize(), equalTo(request.getParameters().getMaxWriteBufferSize()));
}
if (request.getMaxConcurrentReadBatches() != null) {
assertThat(followParameters.getMaxOutstandingReadRequests(), equalTo(request.getMaxConcurrentReadBatches()));
if (request.getParameters().getMaxOutstandingReadRequests() != null) {
assertThat(followParameters.getMaxOutstandingReadRequests(),
equalTo(request.getParameters().getMaxOutstandingReadRequests()));
}
if (request.getMaxConcurrentWriteBatches() != null) {
assertThat(followParameters.getMaxOutstandingWriteRequests(), equalTo(request.getMaxConcurrentWriteBatches()));
if (request.getParameters().getMaxOutstandingWriteRequests() != null) {
assertThat(followParameters.getMaxOutstandingWriteRequests(),
equalTo(request.getParameters().getMaxOutstandingWriteRequests()));
}
if (request.getMaxReadRequestOperationCount() != null) {
assertThat(followParameters.getMaxReadRequestOperationCount(), equalTo(request.getMaxReadRequestOperationCount()));
if (request.getParameters().getMaxReadRequestOperationCount() != null) {
assertThat(followParameters.getMaxReadRequestOperationCount(),
equalTo(request.getParameters().getMaxReadRequestOperationCount()));
}
if (request.getMaxReadRequestSize() != null) {
assertThat(followParameters.getMaxReadRequestSize(), equalTo(request.getMaxReadRequestSize()));
if (request.getParameters().getMaxReadRequestSize() != null) {
assertThat(followParameters.getMaxReadRequestSize(), equalTo(request.getParameters().getMaxReadRequestSize()));
}
if (request.getMaxRetryDelay() != null) {
assertThat(followParameters.getMaxRetryDelay(), equalTo(request.getMaxRetryDelay()));
if (request.getParameters().getMaxRetryDelay() != null) {
assertThat(followParameters.getMaxRetryDelay(), equalTo(request.getParameters().getMaxRetryDelay()));
}
if (request.getReadPollTimeout() != null) {
assertThat(followParameters.getReadPollTimeout(), equalTo(request.getReadPollTimeout()));
if (request.getParameters().getReadPollTimeout() != null) {
assertThat(followParameters.getReadPollTimeout(), equalTo(request.getParameters().getReadPollTimeout()));
}
if (request.getMaxWriteRequestOperationCount() != null) {
assertThat(followParameters.getMaxWriteRequestOperationCount(), equalTo(request.getMaxWriteRequestOperationCount()));
if (request.getParameters().getMaxWriteRequestOperationCount() != null) {
assertThat(followParameters.getMaxWriteRequestOperationCount(),
equalTo(request.getParameters().getMaxWriteRequestOperationCount()));
}
if (request.getMaxWriteRequestSize() != null) {
assertThat(followParameters.getMaxWriteRequestSize(), equalTo(request.getMaxWriteRequestSize()));
if (request.getParameters().getMaxWriteRequestSize() != null) {
assertThat(followParameters.getMaxWriteRequestSize(), equalTo(request.getParameters().getMaxWriteRequestSize()));
}
});
}

View File

@ -90,13 +90,13 @@ public class FollowerFailOverIT extends CcrIntegTestCase {
}
availableDocs.release(between(100, 200));
PutFollowAction.Request follow = putFollow("leader-index", "follower-index");
follow.getFollowRequest().setMaxReadRequestOperationCount(randomIntBetween(32, 2048));
follow.getFollowRequest().setMaxReadRequestSize(new ByteSizeValue(randomIntBetween(1, 4096), ByteSizeUnit.KB));
follow.getFollowRequest().setMaxOutstandingReadRequests(randomIntBetween(1, 10));
follow.getFollowRequest().setMaxWriteRequestOperationCount(randomIntBetween(32, 2048));
follow.getFollowRequest().setMaxWriteRequestSize(new ByteSizeValue(randomIntBetween(1, 4096), ByteSizeUnit.KB));
follow.getFollowRequest().setMaxOutstandingWriteRequests(randomIntBetween(1, 10));
logger.info("--> follow params {}", Strings.toString(follow.getFollowRequest()));
follow.getParameters().setMaxReadRequestOperationCount(randomIntBetween(32, 2048));
follow.getParameters().setMaxReadRequestSize(new ByteSizeValue(randomIntBetween(1, 4096), ByteSizeUnit.KB));
follow.getParameters().setMaxOutstandingReadRequests(randomIntBetween(1, 10));
follow.getParameters().setMaxWriteRequestOperationCount(randomIntBetween(32, 2048));
follow.getParameters().setMaxWriteRequestSize(new ByteSizeValue(randomIntBetween(1, 4096), ByteSizeUnit.KB));
follow.getParameters().setMaxOutstandingWriteRequests(randomIntBetween(1, 10));
logger.info("--> follow request {}", Strings.toString(follow));
followerClient().execute(PutFollowAction.INSTANCE, follow).get();
disableDelayedAllocation("follower-index");
ensureFollowerGreen("follower-index");
@ -151,17 +151,17 @@ public class FollowerFailOverIT extends CcrIntegTestCase {
thread.start();
PutFollowAction.Request followRequest = putFollow("index1", "index2");
followRequest.getFollowRequest().setMaxReadRequestOperationCount(randomIntBetween(32, 2048));
followRequest.getFollowRequest().setMaxReadRequestSize(new ByteSizeValue(randomIntBetween(1, 4096), ByteSizeUnit.KB));
followRequest.getFollowRequest().setMaxOutstandingReadRequests(randomIntBetween(1, 10));
followRequest.getFollowRequest().setMaxWriteRequestOperationCount(randomIntBetween(32, 2048));
followRequest.getFollowRequest().setMaxWriteRequestSize(new ByteSizeValue(randomIntBetween(1, 4096), ByteSizeUnit.KB));
followRequest.getFollowRequest().setMaxOutstandingWriteRequests(randomIntBetween(1, 10));
followRequest.getParameters().setMaxReadRequestOperationCount(randomIntBetween(32, 2048));
followRequest.getParameters().setMaxReadRequestSize(new ByteSizeValue(randomIntBetween(1, 4096), ByteSizeUnit.KB));
followRequest.getParameters().setMaxOutstandingReadRequests(randomIntBetween(1, 10));
followRequest.getParameters().setMaxWriteRequestOperationCount(randomIntBetween(32, 2048));
followRequest.getParameters().setMaxWriteRequestSize(new ByteSizeValue(randomIntBetween(1, 4096), ByteSizeUnit.KB));
followRequest.getParameters().setMaxOutstandingWriteRequests(randomIntBetween(1, 10));
followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
disableDelayedAllocation("index2");
logger.info("--> follow params {}", Strings.toString(followRequest.getFollowRequest()));
logger.info("--> follow request {}", Strings.toString(followRequest));
int maxOpsPerRead = followRequest.getFollowRequest().getMaxReadRequestOperationCount();
int maxOpsPerRead = followRequest.getParameters().getMaxReadRequestOperationCount();
int maxNumDocsReplicated = Math.min(between(50, 500), between(maxOpsPerRead, maxOpsPerRead * 10));
availableDocs.release(maxNumDocsReplicated / 2 + 1);
atLeastDocsIndexed(followerClient(), "index2", maxNumDocsReplicated / 3);

View File

@ -180,7 +180,7 @@ public class IndexFollowingIT extends CcrIntegTestCase {
}
pauseFollow("index2");
followerClient().execute(ResumeFollowAction.INSTANCE, followRequest.getFollowRequest()).get();
followerClient().execute(ResumeFollowAction.INSTANCE, resumeFollow("index2")).get();
final int secondBatchNumDocs = randomIntBetween(2, 64);
logger.info("Indexing [{}] docs as second batch", secondBatchNumDocs);
for (int i = firstBatchNumDocs; i < firstBatchNumDocs + secondBatchNumDocs; i++) {
@ -446,10 +446,10 @@ public class IndexFollowingIT extends CcrIntegTestCase {
atLeastDocsIndexed(leaderClient(), "index1", numDocsIndexed / 3);
PutFollowAction.Request followRequest = putFollow("index1", "index2");
followRequest.getFollowRequest().setMaxReadRequestOperationCount(maxOpsPerRead);
followRequest.getFollowRequest().setMaxOutstandingReadRequests(randomIntBetween(1, 10));
followRequest.getFollowRequest().setMaxOutstandingWriteRequests(randomIntBetween(1, 10));
followRequest.getFollowRequest().setMaxWriteBufferCount(randomIntBetween(1024, 10240));
followRequest.getParameters().setMaxReadRequestOperationCount(maxOpsPerRead);
followRequest.getParameters().setMaxOutstandingReadRequests(randomIntBetween(1, 10));
followRequest.getParameters().setMaxOutstandingWriteRequests(randomIntBetween(1, 10));
followRequest.getParameters().setMaxWriteBufferCount(randomIntBetween(1024, 10240));
followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
availableDocs.release(numDocsIndexed * 2 + bulkSize);
atLeastDocsIndexed(leaderClient(), "index1", numDocsIndexed);
@ -544,7 +544,7 @@ public class IndexFollowingIT extends CcrIntegTestCase {
}
PutFollowAction.Request followRequest = putFollow("index1", "index2");
followRequest.getFollowRequest().setMaxReadRequestSize(new ByteSizeValue(1, ByteSizeUnit.BYTES));
followRequest.getParameters().setMaxReadRequestSize(new ByteSizeValue(1, ByteSizeUnit.BYTES));
followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
final Map<ShardId, Long> firstBatchNumDocsPerShard = new HashMap<>();
@ -1016,7 +1016,7 @@ public class IndexFollowingIT extends CcrIntegTestCase {
forceMergeRequest.maxNumSegments(1);
leaderClient().admin().indices().forceMerge(forceMergeRequest).actionGet();
followerClient().execute(ResumeFollowAction.INSTANCE, followRequest.getFollowRequest()).get();
followerClient().execute(ResumeFollowAction.INSTANCE, resumeFollow("index2")).get();
assertBusy(() -> {
List<ShardFollowNodeTaskStatus> statuses = getFollowTaskStatuses("index2");

View File

@ -85,7 +85,7 @@ public class LocalIndexFollowingIT extends CcrSingleNodeTestCase {
followRequest.setFollowerIndex("follower-index");
PutFollowAction.Request putFollowRequest = getPutFollowRequest("leader", "follower");
putFollowRequest.setLeaderIndex("leader-index");
putFollowRequest.setFollowRequest(followRequest);
putFollowRequest.setFollowerIndex("follower-index");
IllegalArgumentException error = expectThrows(IllegalArgumentException.class,
() -> client().execute(PutFollowAction.INSTANCE, putFollowRequest).actionGet());
assertThat(error.getMessage(), equalTo("leader index [leader-index] does not have soft deletes enabled"));
@ -98,7 +98,7 @@ public class LocalIndexFollowingIT extends CcrSingleNodeTestCase {
request.setRemoteCluster("local");
request.setLeaderIndexPatterns(Collections.singletonList("logs-*"));
request.setFollowIndexNamePattern("copy-{{leader_index}}");
request.setReadPollTimeout(TimeValue.timeValueMillis(10));
request.getParameters().setReadPollTimeout(TimeValue.timeValueMillis(10));
assertTrue(client().execute(PutAutoFollowPatternAction.INSTANCE, request).actionGet().isAcknowledged());
long previousNumberOfSuccessfulFollowedIndices = getAutoFollowStats().getNumberOfSuccessfulFollowIndices();

View File

@ -109,7 +109,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
assertThat(headers, equalTo(autoFollowHeaders.get("remote")));
assertThat(followRequest.getRemoteCluster(), equalTo("remote"));
assertThat(followRequest.getLeaderIndex(), equalTo("logs-20190101"));
assertThat(followRequest.getFollowRequest().getFollowerIndex(), equalTo("logs-20190101"));
assertThat(followRequest.getFollowerIndex(), equalTo("logs-20190101"));
successHandler.run();
}
@ -227,7 +227,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
Consumer<Exception> failureHandler) {
assertThat(followRequest.getRemoteCluster(), equalTo("remote"));
assertThat(followRequest.getLeaderIndex(), equalTo("logs-20190101"));
assertThat(followRequest.getFollowRequest().getFollowerIndex(), equalTo("logs-20190101"));
assertThat(followRequest.getFollowerIndex(), equalTo("logs-20190101"));
successHandler.run();
}
@ -284,7 +284,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
Consumer<Exception> failureHandler) {
assertThat(followRequest.getRemoteCluster(), equalTo("remote"));
assertThat(followRequest.getLeaderIndex(), equalTo("logs-20190101"));
assertThat(followRequest.getFollowRequest().getFollowerIndex(), equalTo("logs-20190101"));
assertThat(followRequest.getFollowerIndex(), equalTo("logs-20190101"));
failureHandler.accept(failure);
}

View File

@ -20,61 +20,13 @@ import java.util.ArrayList;
import java.util.List;
import static org.elasticsearch.xpack.core.ccr.action.FollowInfoAction.Response.FOLLOWER_INDICES_FIELD;
import static org.elasticsearch.xpack.core.ccr.action.FollowInfoAction.Response.FollowParameters;
import org.elasticsearch.xpack.core.ccr.action.FollowParameters;
import static org.elasticsearch.xpack.core.ccr.action.FollowInfoAction.Response.Status;
public class FollowInfoResponseTests extends AbstractSerializingTestCase<FollowInfoAction.Response> {
static final ConstructingObjectParser<FollowParameters, Void> PARAMETERS_PARSER = new ConstructingObjectParser<>(
"parameters_parser",
args -> {
return new FollowParameters(
(Integer) args[0],
(ByteSizeValue) args[1],
(Integer) args[2],
(Integer) args[3],
(ByteSizeValue) args[4],
(Integer) args[5],
(Integer) args[6],
(ByteSizeValue) args[7],
(TimeValue) args[8],
(TimeValue) args[9]
);
});
static {
PARAMETERS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), ShardFollowTask.MAX_READ_REQUEST_OPERATION_COUNT);
PARAMETERS_PARSER.declareField(
ConstructingObjectParser.constructorArg(),
(p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), ShardFollowTask.MAX_READ_REQUEST_SIZE.getPreferredName()),
ShardFollowTask.MAX_READ_REQUEST_SIZE,
ObjectParser.ValueType.STRING);
PARAMETERS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), ShardFollowTask.MAX_OUTSTANDING_READ_REQUESTS);
PARAMETERS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), ShardFollowTask.MAX_WRITE_REQUEST_OPERATION_COUNT);
PARAMETERS_PARSER.declareField(
ConstructingObjectParser.constructorArg(),
(p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), ShardFollowTask.MAX_WRITE_REQUEST_SIZE.getPreferredName()),
ShardFollowTask.MAX_WRITE_REQUEST_SIZE,
ObjectParser.ValueType.STRING);
PARAMETERS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), ShardFollowTask.MAX_OUTSTANDING_WRITE_REQUESTS);
PARAMETERS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), ShardFollowTask.MAX_WRITE_BUFFER_COUNT);
PARAMETERS_PARSER.declareField(
ConstructingObjectParser.constructorArg(),
(p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), ShardFollowTask.MAX_WRITE_BUFFER_SIZE.getPreferredName()),
ShardFollowTask.MAX_WRITE_BUFFER_SIZE,
ObjectParser.ValueType.STRING);
PARAMETERS_PARSER.declareField(
ConstructingObjectParser.constructorArg(),
(p, c) -> TimeValue.parseTimeValue(p.text(), ShardFollowTask.MAX_RETRY_DELAY.getPreferredName()),
ShardFollowTask.MAX_RETRY_DELAY,
ObjectParser.ValueType.STRING);
PARAMETERS_PARSER.declareField(
ConstructingObjectParser.constructorArg(),
(p, c) -> TimeValue.parseTimeValue(p.text(), ShardFollowTask.READ_POLL_TIMEOUT.getPreferredName()),
ShardFollowTask.READ_POLL_TIMEOUT,
ObjectParser.ValueType.STRING);
}
static final ObjectParser<FollowParameters, Void> PARAMETERS_PARSER = new ObjectParser<>("parameters_parser", FollowParameters::new);
static final ConstructingObjectParser<FollowerInfo, Void> INFO_PARSER = new ConstructingObjectParser<>(
"info_parser",
args -> {
@ -88,6 +40,8 @@ public class FollowInfoResponseTests extends AbstractSerializingTestCase<FollowI
});
static {
FollowParameters.initParser(PARAMETERS_PARSER);
INFO_PARSER.declareString(ConstructingObjectParser.constructorArg(), FollowerInfo.FOLLOWER_INDEX_FIELD);
INFO_PARSER.declareString(ConstructingObjectParser.constructorArg(), FollowerInfo.REMOTE_CLUSTER_FIELD);
INFO_PARSER.declareString(ConstructingObjectParser.constructorArg(), FollowerInfo.LEADER_INDEX_FIELD);
@ -125,18 +79,17 @@ public class FollowInfoResponseTests extends AbstractSerializingTestCase<FollowI
for (int i = 0; i < numInfos; i++) {
FollowParameters followParameters = null;
if (randomBoolean()) {
followParameters = new FollowParameters(
randomIntBetween(0, Integer.MAX_VALUE),
new ByteSizeValue(randomNonNegativeLong()),
randomIntBetween(0, Integer.MAX_VALUE),
randomIntBetween(0, Integer.MAX_VALUE),
new ByteSizeValue(randomNonNegativeLong()),
randomIntBetween(0, Integer.MAX_VALUE),
randomIntBetween(0, Integer.MAX_VALUE),
new ByteSizeValue(randomNonNegativeLong()),
new TimeValue(randomNonNegativeLong()),
new TimeValue(randomNonNegativeLong())
);
followParameters = new FollowParameters();
followParameters.setMaxOutstandingReadRequests(randomIntBetween(0, Integer.MAX_VALUE));
followParameters.setMaxOutstandingWriteRequests(randomIntBetween(0, Integer.MAX_VALUE));
followParameters.setMaxReadRequestOperationCount(randomIntBetween(0, Integer.MAX_VALUE));
followParameters.setMaxWriteRequestOperationCount(randomIntBetween(0, Integer.MAX_VALUE));
followParameters.setMaxReadRequestSize(new ByteSizeValue(randomNonNegativeLong()));
followParameters.setMaxWriteRequestSize(new ByteSizeValue(randomNonNegativeLong()));
followParameters.setMaxWriteBufferCount(randomIntBetween(0, Integer.MAX_VALUE));
followParameters.setMaxWriteBufferSize(new ByteSizeValue(randomNonNegativeLong()));
followParameters.setMaxRetryDelay(new TimeValue(randomNonNegativeLong()));
followParameters.setReadPollTimeout(new TimeValue(randomNonNegativeLong()));
}
infos.add(new FollowerInfo(randomAlphaOfLength(4), randomAlphaOfLength(4), randomAlphaOfLength(4),

View File

@ -7,10 +7,9 @@ package org.elasticsearch.xpack.ccr.action;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.test.AbstractSerializingTestCase;
import org.elasticsearch.xpack.core.ccr.action.PutAutoFollowPatternAction;
@ -43,44 +42,27 @@ public class PutAutoFollowPatternRequestTests extends AbstractSerializingTestCas
protected PutAutoFollowPatternAction.Request createTestInstance() {
PutAutoFollowPatternAction.Request request = new PutAutoFollowPatternAction.Request();
request.setName(randomAlphaOfLength(4));
request.setRemoteCluster(randomAlphaOfLength(4));
request.setLeaderIndexPatterns(Arrays.asList(generateRandomStringArray(4, 4, false)));
if (randomBoolean()) {
request.setFollowIndexNamePattern(randomAlphaOfLength(4));
}
ResumeFollowActionRequestTests.generateFollowParameters(request.getParameters());
return request;
}
@Override
protected PutAutoFollowPatternAction.Request createXContextTestInstance(XContentType xContentType) {
// follower index parameter is not part of the request body and is provided in the url path.
// So this field cannot be used for creating a test instance for xcontent testing.
PutAutoFollowPatternAction.Request request = new PutAutoFollowPatternAction.Request();
request.setRemoteCluster(randomAlphaOfLength(4));
request.setLeaderIndexPatterns(Arrays.asList(generateRandomStringArray(4, 4, false)));
if (randomBoolean()) {
request.setReadPollTimeout(TimeValue.timeValueMillis(500));
}
if (randomBoolean()) {
request.setMaxRetryDelay(TimeValue.timeValueMillis(500));
}
if (randomBoolean()) {
request.setMaxWriteRequestOperationCount(randomIntBetween(0, Integer.MAX_VALUE));
}
if (randomBoolean()) {
request.setMaxWriteBufferSize(new ByteSizeValue(randomNonNegativeLong()));
}
if (randomBoolean()) {
request.setMaxWriteRequestSize(new ByteSizeValue(randomNonNegativeLong()));
}
if (randomBoolean()) {
request.setMaxReadRequestOperationCount(randomIntBetween(0, Integer.MAX_VALUE));
}
if (randomBoolean()) {
request.setMaxConcurrentReadBatches(randomIntBetween(0, Integer.MAX_VALUE));
}
if (randomBoolean()) {
request.setMaxConcurrentWriteBatches(randomIntBetween(0, Integer.MAX_VALUE));
}
if (randomBoolean()) {
request.setMaxReadRequestSize(new ByteSizeValue(randomNonNegativeLong(), ByteSizeUnit.BYTES));
}
if (randomBoolean()) {
request.setMaxWriteBufferCount(randomIntBetween(0, Integer.MAX_VALUE));
}
if (randomBoolean()) {
request.setMaxWriteBufferSize(new ByteSizeValue(randomNonNegativeLong()));
request.setFollowIndexNamePattern(randomAlphaOfLength(4));
}
ResumeFollowActionRequestTests.generateFollowParameters(request.getParameters());
return request;
}
@ -109,17 +91,17 @@ public class PutAutoFollowPatternRequestTests extends AbstractSerializingTestCas
validationException = request.validate();
assertThat(validationException, nullValue());
request.setMaxRetryDelay(TimeValue.ZERO);
request.getParameters().setMaxRetryDelay(TimeValue.ZERO);
validationException = request.validate();
assertThat(validationException, notNullValue());
assertThat(validationException.getMessage(), containsString("[max_retry_delay] must be positive but was [0ms]"));
request.setMaxRetryDelay(TimeValue.timeValueMinutes(10));
request.getParameters().setMaxRetryDelay(TimeValue.timeValueMinutes(10));
validationException = request.validate();
assertThat(validationException, notNullValue());
assertThat(validationException.getMessage(), containsString("[max_retry_delay] must be less than [5m] but was [10m]"));
request.setMaxRetryDelay(TimeValue.timeValueMinutes(1));
request.getParameters().setMaxRetryDelay(TimeValue.timeValueMinutes(1));
validationException = request.validate();
assertThat(validationException, nullValue());
}

View File

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.ccr.action;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.test.AbstractSerializingTestCase;
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
@ -23,15 +24,31 @@ public class PutFollowActionRequestTests extends AbstractSerializingTestCase<Put
@Override
protected PutFollowAction.Request createTestInstance() {
PutFollowAction.Request request = new PutFollowAction.Request();
request.setFollowerIndex(randomAlphaOfLength(4));
request.waitForActiveShards(randomFrom(ActiveShardCount.DEFAULT, ActiveShardCount.NONE, ActiveShardCount.ONE,
ActiveShardCount.ALL));
request.setRemoteCluster(randomAlphaOfLength(4));
request.setLeaderIndex(randomAlphaOfLength(4));
request.setFollowRequest(ResumeFollowActionRequestTests.createTestRequest());
ResumeFollowActionRequestTests.generateFollowParameters(request.getParameters());
return request;
}
@Override
protected PutFollowAction.Request createXContextTestInstance(XContentType xContentType) {
// follower index parameter and wait for active shards params are not part of the request body and
// are provided in the url path. So these fields cannot be used for creating a test instance for xcontent testing.
PutFollowAction.Request request = new PutFollowAction.Request();
request.setRemoteCluster(randomAlphaOfLength(4));
request.setLeaderIndex(randomAlphaOfLength(4));
request.setFollowerIndex("followerIndex");
ResumeFollowActionRequestTests.generateFollowParameters(request.getParameters());
return request;
}
@Override
protected PutFollowAction.Request doParseInstance(XContentParser parser) throws IOException {
return PutFollowAction.Request.fromXContent(parser, null, ActiveShardCount.DEFAULT);
return PutFollowAction.Request.fromXContent(parser, "followerIndex", ActiveShardCount.DEFAULT);
}
@Override

View File

@ -11,7 +11,9 @@ import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.test.AbstractSerializingTestCase;
import org.elasticsearch.xpack.core.ccr.action.FollowParameters;
import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction;
import java.io.IOException;
@ -29,7 +31,20 @@ public class ResumeFollowActionRequestTests extends AbstractSerializingTestCase<
@Override
protected ResumeFollowAction.Request createTestInstance() {
return createTestRequest();
ResumeFollowAction.Request request = new ResumeFollowAction.Request();
request.setFollowerIndex(randomAlphaOfLength(4));
generateFollowParameters(request.getParameters());
return request;
}
@Override
protected ResumeFollowAction.Request createXContextTestInstance(XContentType type) {
// follower index parameter is not part of the request body and is provided in the url path.
// So this field cannot be used for creating a test instance for xcontent testing.
ResumeFollowAction.Request request = new ResumeFollowAction.Request();
generateFollowParameters(request.getParameters());
return request;
}
@Override
@ -42,57 +57,54 @@ public class ResumeFollowActionRequestTests extends AbstractSerializingTestCase<
return false;
}
static ResumeFollowAction.Request createTestRequest() {
ResumeFollowAction.Request request = new ResumeFollowAction.Request();
request.setFollowerIndex(randomAlphaOfLength(4));
static void generateFollowParameters(FollowParameters followParameters) {
if (randomBoolean()) {
request.setMaxReadRequestOperationCount(randomIntBetween(1, Integer.MAX_VALUE));
followParameters.setMaxReadRequestOperationCount(randomIntBetween(1, Integer.MAX_VALUE));
}
if (randomBoolean()) {
request.setMaxOutstandingReadRequests(randomIntBetween(1, Integer.MAX_VALUE));
followParameters.setMaxOutstandingReadRequests(randomIntBetween(1, Integer.MAX_VALUE));
}
if (randomBoolean()) {
request.setMaxOutstandingWriteRequests(randomIntBetween(1, Integer.MAX_VALUE));
followParameters.setMaxOutstandingWriteRequests(randomIntBetween(1, Integer.MAX_VALUE));
}
if (randomBoolean()) {
request.setMaxReadRequestSize(new ByteSizeValue(randomNonNegativeLong(), ByteSizeUnit.BYTES));
followParameters.setMaxReadRequestSize(new ByteSizeValue(randomNonNegativeLong(), ByteSizeUnit.BYTES));
}
if (randomBoolean()) {
request.setMaxWriteBufferCount(randomIntBetween(1, Integer.MAX_VALUE));
followParameters.setMaxWriteBufferCount(randomIntBetween(1, Integer.MAX_VALUE));
}
if (randomBoolean()) {
request.setMaxWriteRequestOperationCount(randomIntBetween(1, Integer.MAX_VALUE));
followParameters.setMaxWriteRequestOperationCount(randomIntBetween(1, Integer.MAX_VALUE));
}
if (randomBoolean()) {
request.setMaxWriteRequestSize(new ByteSizeValue(randomNonNegativeLong()));
followParameters.setMaxWriteRequestSize(new ByteSizeValue(randomNonNegativeLong()));
}
if (randomBoolean()) {
request.setMaxWriteBufferSize(new ByteSizeValue(randomNonNegativeLong(), ByteSizeUnit.BYTES));
followParameters.setMaxWriteBufferSize(new ByteSizeValue(randomNonNegativeLong(), ByteSizeUnit.BYTES));
}
if (randomBoolean()) {
request.setMaxRetryDelay(TimeValue.timeValueMillis(500));
followParameters.setMaxRetryDelay(TimeValue.timeValueMillis(500));
}
if (randomBoolean()) {
request.setReadPollTimeout(TimeValue.timeValueMillis(500));
followParameters.setReadPollTimeout(TimeValue.timeValueMillis(500));
}
return request;
}
public void testValidate() {
ResumeFollowAction.Request request = new ResumeFollowAction.Request();
request.setFollowerIndex("index2");
request.setMaxRetryDelay(TimeValue.ZERO);
request.getParameters().setMaxRetryDelay(TimeValue.ZERO);
ActionRequestValidationException validationException = request.validate();
assertThat(validationException, notNullValue());
assertThat(validationException.getMessage(), containsString("[max_retry_delay] must be positive but was [0ms]"));
request.setMaxRetryDelay(TimeValue.timeValueMinutes(10));
request.getParameters().setMaxRetryDelay(TimeValue.timeValueMinutes(10));
validationException = request.validate();
assertThat(validationException, notNullValue());
assertThat(validationException.getMessage(), containsString("[max_retry_delay] must be less than [5m] but was [10m]"));
request.setMaxRetryDelay(TimeValue.timeValueMinutes(1));
request.getParameters().setMaxRetryDelay(TimeValue.timeValueMinutes(1));
validationException = request.validate();
assertThat(validationException, nullValue());
}

View File

@ -14,8 +14,6 @@ import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -24,17 +22,6 @@ import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_OUTSTANDING_READ_REQUESTS;
import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_OUTSTANDING_WRITE_REQUESTS;
import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_READ_REQUEST_OPERATION_COUNT;
import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_READ_REQUEST_SIZE;
import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_RETRY_DELAY_FIELD;
import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_WRITE_BUFFER_COUNT;
import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_WRITE_BUFFER_SIZE;
import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_WRITE_REQUEST_OPERATION_COUNT;
import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_WRITE_REQUEST_SIZE;
import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.READ_POLL_TIMEOUT;
public class FollowInfoAction extends Action<FollowInfoAction.Response> {
public static final String NAME = "cluster:monitor/ccr/follow_info";
@ -202,7 +189,7 @@ public class FollowInfoAction extends Action<FollowInfoAction.Response> {
remoteCluster = in.readString();
leaderIndex = in.readString();
status = Status.fromString(in.readString());
parameters = in.readOptionalWriteable(FollowParameters::new);
parameters = in.readOptionalWriteable(innerIn -> new FollowParameters(in));
}
@Override
@ -224,16 +211,7 @@ public class FollowInfoAction extends Action<FollowInfoAction.Response> {
if (parameters != null) {
builder.startObject(PARAMETERS_FIELD.getPreferredName());
{
builder.field(MAX_READ_REQUEST_OPERATION_COUNT.getPreferredName(), parameters.maxReadRequestOperationCount);
builder.field(MAX_READ_REQUEST_SIZE.getPreferredName(), parameters.maxReadRequestSize.getStringRep());
builder.field(MAX_OUTSTANDING_READ_REQUESTS.getPreferredName(), parameters.maxOutstandingReadRequests);
builder.field(MAX_WRITE_REQUEST_OPERATION_COUNT.getPreferredName(), parameters.maxWriteRequestOperationCount);
builder.field(MAX_WRITE_REQUEST_SIZE.getPreferredName(), parameters.maxWriteRequestSize.getStringRep());
builder.field(MAX_OUTSTANDING_WRITE_REQUESTS.getPreferredName(), parameters.maxOutstandingWriteRequests);
builder.field(MAX_WRITE_BUFFER_COUNT.getPreferredName(), parameters.maxWriteBufferCount);
builder.field(MAX_WRITE_BUFFER_SIZE.getPreferredName(), parameters.maxWriteBufferSize.getStringRep());
builder.field(MAX_RETRY_DELAY_FIELD.getPreferredName(), parameters.maxRetryDelay.getStringRep());
builder.field(READ_POLL_TIMEOUT.getPreferredName(), parameters.readPollTimeout.getStringRep());
parameters.toXContentFragment(builder);
}
builder.endObject();
}
@ -263,138 +241,6 @@ public class FollowInfoAction extends Action<FollowInfoAction.Response> {
}
}
public static class FollowParameters implements Writeable {
private final int maxReadRequestOperationCount;
private final ByteSizeValue maxReadRequestSize;
private final int maxOutstandingReadRequests;
private final int maxWriteRequestOperationCount;
private final ByteSizeValue maxWriteRequestSize;
private final int maxOutstandingWriteRequests;
private final int maxWriteBufferCount;
private final ByteSizeValue maxWriteBufferSize;
private final TimeValue maxRetryDelay;
private final TimeValue readPollTimeout;
public FollowParameters(int maxReadRequestOperationCount,
ByteSizeValue maxReadRequestSize, int maxOutstandingReadRequests,
int maxWriteRequestOperationCount, ByteSizeValue maxWriteRequestSize,
int maxOutstandingWriteRequests, int maxWriteBufferCount,
ByteSizeValue maxWriteBufferSize, TimeValue maxRetryDelay, TimeValue readPollTimeout) {
this.maxReadRequestOperationCount = maxReadRequestOperationCount;
this.maxReadRequestSize = maxReadRequestSize;
this.maxOutstandingReadRequests = maxOutstandingReadRequests;
this.maxWriteRequestOperationCount = maxWriteRequestOperationCount;
this.maxWriteRequestSize = maxWriteRequestSize;
this.maxOutstandingWriteRequests = maxOutstandingWriteRequests;
this.maxWriteBufferCount = maxWriteBufferCount;
this.maxWriteBufferSize = maxWriteBufferSize;
this.maxRetryDelay = maxRetryDelay;
this.readPollTimeout = readPollTimeout;
}
public int getMaxReadRequestOperationCount() {
return maxReadRequestOperationCount;
}
public ByteSizeValue getMaxReadRequestSize() {
return maxReadRequestSize;
}
public int getMaxOutstandingReadRequests() {
return maxOutstandingReadRequests;
}
public int getMaxWriteRequestOperationCount() {
return maxWriteRequestOperationCount;
}
public ByteSizeValue getMaxWriteRequestSize() {
return maxWriteRequestSize;
}
public int getMaxOutstandingWriteRequests() {
return maxOutstandingWriteRequests;
}
public int getMaxWriteBufferCount() {
return maxWriteBufferCount;
}
public ByteSizeValue getMaxWriteBufferSize() {
return maxWriteBufferSize;
}
public TimeValue getMaxRetryDelay() {
return maxRetryDelay;
}
public TimeValue getReadPollTimeout() {
return readPollTimeout;
}
FollowParameters(StreamInput in) throws IOException {
this.maxReadRequestOperationCount = in.readVInt();
this.maxReadRequestSize = new ByteSizeValue(in);
this.maxOutstandingReadRequests = in.readVInt();
this.maxWriteRequestOperationCount = in.readVInt();
this.maxWriteRequestSize = new ByteSizeValue(in);
this.maxOutstandingWriteRequests = in.readVInt();
this.maxWriteBufferCount = in.readVInt();
this.maxWriteBufferSize = new ByteSizeValue(in);
this.maxRetryDelay = in.readTimeValue();
this.readPollTimeout = in.readTimeValue();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(maxReadRequestOperationCount);
maxReadRequestSize.writeTo(out);
out.writeVInt(maxOutstandingReadRequests);
out.writeVLong(maxWriteRequestOperationCount);
maxWriteRequestSize.writeTo(out);
out.writeVInt(maxOutstandingWriteRequests);
out.writeVInt(maxWriteBufferCount);
maxWriteBufferSize.writeTo(out);
out.writeTimeValue(maxRetryDelay);
out.writeTimeValue(readPollTimeout);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
FollowParameters that = (FollowParameters) o;
return maxReadRequestOperationCount == that.maxReadRequestOperationCount &&
maxOutstandingReadRequests == that.maxOutstandingReadRequests &&
maxWriteRequestOperationCount == that.maxWriteRequestOperationCount &&
maxOutstandingWriteRequests == that.maxOutstandingWriteRequests &&
maxWriteBufferCount == that.maxWriteBufferCount &&
Objects.equals(maxReadRequestSize, that.maxReadRequestSize) &&
Objects.equals(maxWriteRequestSize, that.maxWriteRequestSize) &&
Objects.equals(maxWriteBufferSize, that.maxWriteBufferSize) &&
Objects.equals(maxRetryDelay, that.maxRetryDelay) &&
Objects.equals(readPollTimeout, that.readPollTimeout);
}
@Override
public int hashCode() {
return Objects.hash(
maxReadRequestOperationCount,
maxReadRequestSize,
maxOutstandingReadRequests,
maxWriteRequestOperationCount,
maxWriteRequestSize,
maxOutstandingWriteRequests,
maxWriteBufferCount,
maxWriteBufferSize,
maxRetryDelay,
readPollTimeout
);
}
}
public enum Status {
ACTIVE("active"),

View File

@ -0,0 +1,314 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.ccr.action;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.AbstractObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
import java.io.IOException;
import java.util.Objects;
import static org.elasticsearch.action.ValidateActions.addValidationError;
public class FollowParameters implements Writeable {
static final TimeValue RETRY_DELAY_MAX = TimeValue.timeValueMinutes(5);
static final ParseField MAX_READ_REQUEST_OPERATION_COUNT = new ParseField("max_read_request_operation_count");
static final ParseField MAX_WRITE_REQUEST_OPERATION_COUNT = new ParseField("max_write_request_operation_count");
static final ParseField MAX_OUTSTANDING_READ_REQUESTS = new ParseField("max_outstanding_read_requests");
static final ParseField MAX_OUTSTANDING_WRITE_REQUESTS = new ParseField("max_outstanding_write_requests");
static final ParseField MAX_READ_REQUEST_SIZE = new ParseField("max_read_request_size");
static final ParseField MAX_WRITE_REQUEST_SIZE = new ParseField("max_write_request_size");
static final ParseField MAX_WRITE_BUFFER_COUNT = new ParseField("max_write_buffer_count");
static final ParseField MAX_WRITE_BUFFER_SIZE = new ParseField("max_write_buffer_size");
static final ParseField MAX_RETRY_DELAY = new ParseField("max_retry_delay");
static final ParseField READ_POLL_TIMEOUT = new ParseField("read_poll_timeout");
Integer maxReadRequestOperationCount;
Integer maxWriteRequestOperationCount;
Integer maxOutstandingReadRequests;
Integer maxOutstandingWriteRequests;
ByteSizeValue maxReadRequestSize;
ByteSizeValue maxWriteRequestSize;
Integer maxWriteBufferCount;
ByteSizeValue maxWriteBufferSize;
TimeValue maxRetryDelay;
TimeValue readPollTimeout;
public FollowParameters() {
}
public FollowParameters(FollowParameters source) {
this.maxReadRequestOperationCount = source.maxReadRequestOperationCount;
this.maxWriteRequestOperationCount = source.maxWriteRequestOperationCount;
this.maxOutstandingReadRequests = source.maxOutstandingReadRequests;
this.maxOutstandingWriteRequests = source.maxOutstandingWriteRequests;
this.maxReadRequestSize = source.maxReadRequestSize;
this.maxWriteRequestSize = source.maxWriteRequestSize;
this.maxWriteBufferCount = source.maxWriteBufferCount;
this.maxWriteBufferSize = source.maxWriteBufferSize;
this.maxRetryDelay = source.maxRetryDelay;
this.readPollTimeout = source.readPollTimeout;
}
public Integer getMaxReadRequestOperationCount() {
return maxReadRequestOperationCount;
}
public void setMaxReadRequestOperationCount(Integer maxReadRequestOperationCount) {
this.maxReadRequestOperationCount = maxReadRequestOperationCount;
}
public ByteSizeValue getMaxReadRequestSize() {
return maxReadRequestSize;
}
public void setMaxReadRequestSize(ByteSizeValue maxReadRequestSize) {
this.maxReadRequestSize = maxReadRequestSize;
}
public Integer getMaxOutstandingReadRequests() {
return maxOutstandingReadRequests;
}
public void setMaxOutstandingReadRequests(Integer maxOutstandingReadRequests) {
this.maxOutstandingReadRequests = maxOutstandingReadRequests;
}
public Integer getMaxWriteRequestOperationCount() {
return maxWriteRequestOperationCount;
}
public void setMaxWriteRequestOperationCount(Integer maxWriteRequestOperationCount) {
this.maxWriteRequestOperationCount = maxWriteRequestOperationCount;
}
public ByteSizeValue getMaxWriteRequestSize() {
return maxWriteRequestSize;
}
public void setMaxWriteRequestSize(ByteSizeValue maxWriteRequestSize) {
this.maxWriteRequestSize = maxWriteRequestSize;
}
public Integer getMaxOutstandingWriteRequests() {
return maxOutstandingWriteRequests;
}
public void setMaxOutstandingWriteRequests(Integer maxOutstandingWriteRequests) {
this.maxOutstandingWriteRequests = maxOutstandingWriteRequests;
}
public Integer getMaxWriteBufferCount() {
return maxWriteBufferCount;
}
public void setMaxWriteBufferCount(Integer maxWriteBufferCount) {
this.maxWriteBufferCount = maxWriteBufferCount;
}
public ByteSizeValue getMaxWriteBufferSize() {
return maxWriteBufferSize;
}
public void setMaxWriteBufferSize(ByteSizeValue maxWriteBufferSize) {
this.maxWriteBufferSize = maxWriteBufferSize;
}
public TimeValue getMaxRetryDelay() {
return maxRetryDelay;
}
public void setMaxRetryDelay(TimeValue maxRetryDelay) {
this.maxRetryDelay = maxRetryDelay;
}
public TimeValue getReadPollTimeout() {
return readPollTimeout;
}
public void setReadPollTimeout(TimeValue readPollTimeout) {
this.readPollTimeout = readPollTimeout;
}
public ActionRequestValidationException validate() {
ActionRequestValidationException e = null;
if (maxReadRequestOperationCount != null && maxReadRequestOperationCount < 1) {
e = addValidationError(MAX_READ_REQUEST_OPERATION_COUNT.getPreferredName() + " must be larger than 0", e);
}
if (maxReadRequestSize != null && maxReadRequestSize.compareTo(ByteSizeValue.ZERO) <= 0) {
e = addValidationError(MAX_READ_REQUEST_SIZE.getPreferredName() + " must be larger than 0", e);
}
if (maxOutstandingReadRequests != null && maxOutstandingReadRequests < 1) {
e = addValidationError(MAX_OUTSTANDING_READ_REQUESTS.getPreferredName() + " must be larger than 0", e);
}
if (maxWriteRequestOperationCount != null && maxWriteRequestOperationCount < 1) {
e = addValidationError(MAX_WRITE_REQUEST_OPERATION_COUNT.getPreferredName() + " must be larger than 0", e);
}
if (maxWriteRequestSize != null && maxWriteRequestSize.compareTo(ByteSizeValue.ZERO) <= 0) {
e = addValidationError(MAX_WRITE_REQUEST_SIZE.getPreferredName() + " must be larger than 0", e);
}
if (maxOutstandingWriteRequests != null && maxOutstandingWriteRequests < 1) {
e = addValidationError(MAX_OUTSTANDING_WRITE_REQUESTS.getPreferredName() + " must be larger than 0", e);
}
if (maxWriteBufferCount != null && maxWriteBufferCount < 1) {
e = addValidationError(MAX_WRITE_BUFFER_COUNT.getPreferredName() + " must be larger than 0", e);
}
if (maxWriteBufferSize != null && maxWriteBufferSize.compareTo(ByteSizeValue.ZERO) <= 0) {
e = addValidationError(MAX_WRITE_BUFFER_SIZE.getPreferredName() + " must be larger than 0", e);
}
if (maxRetryDelay != null && maxRetryDelay.millis() <= 0) {
String message = "[" + MAX_RETRY_DELAY.getPreferredName() + "] must be positive but was [" +
maxRetryDelay.getStringRep() + "]";
e = addValidationError(message, e);
}
if (maxRetryDelay != null && maxRetryDelay.millis() > RETRY_DELAY_MAX.millis()) {
String message = "[" + MAX_RETRY_DELAY.getPreferredName() + "] must be less than [" + RETRY_DELAY_MAX.getStringRep() +
"] but was [" + maxRetryDelay.getStringRep() + "]";
e = addValidationError(message, e);
}
return e;
}
FollowParameters(StreamInput in) throws IOException {
fromStreamInput(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalVInt(maxReadRequestOperationCount);
out.writeOptionalVInt(maxOutstandingReadRequests);
out.writeOptionalWriteable(maxReadRequestSize);
out.writeOptionalVInt(maxWriteRequestOperationCount);
out.writeOptionalWriteable(maxWriteRequestSize);
out.writeOptionalVInt(maxOutstandingWriteRequests);
out.writeOptionalVInt(maxWriteBufferCount);
out.writeOptionalWriteable(maxWriteBufferSize);
out.writeOptionalTimeValue(maxRetryDelay);
out.writeOptionalTimeValue(readPollTimeout);
}
void fromStreamInput(StreamInput in) throws IOException {
maxReadRequestOperationCount = in.readOptionalVInt();
maxOutstandingReadRequests = in.readOptionalVInt();
maxReadRequestSize = in.readOptionalWriteable(ByteSizeValue::new);
maxWriteRequestOperationCount = in.readOptionalVInt();
maxWriteRequestSize = in.readOptionalWriteable(ByteSizeValue::new);
maxOutstandingWriteRequests = in.readOptionalVInt();
maxWriteBufferCount = in.readOptionalVInt();
maxWriteBufferSize = in.readOptionalWriteable(ByteSizeValue::new);
maxRetryDelay = in.readOptionalTimeValue();
readPollTimeout = in.readOptionalTimeValue();
}
XContentBuilder toXContentFragment(final XContentBuilder builder) throws IOException {
if (maxReadRequestOperationCount != null) {
builder.field(MAX_READ_REQUEST_OPERATION_COUNT.getPreferredName(), maxReadRequestOperationCount);
}
if (maxWriteRequestOperationCount != null) {
builder.field(MAX_WRITE_REQUEST_OPERATION_COUNT.getPreferredName(), maxWriteRequestOperationCount);
}
if (maxOutstandingReadRequests != null) {
builder.field(MAX_OUTSTANDING_READ_REQUESTS.getPreferredName(), maxOutstandingReadRequests);
}
if (maxOutstandingWriteRequests != null) {
builder.field(MAX_OUTSTANDING_WRITE_REQUESTS.getPreferredName(), maxOutstandingWriteRequests);
}
if (maxReadRequestSize != null) {
builder.field(MAX_READ_REQUEST_SIZE.getPreferredName(), maxReadRequestSize.getStringRep());
}
if (maxWriteRequestSize != null) {
builder.field(MAX_WRITE_REQUEST_SIZE.getPreferredName(), maxWriteRequestSize.getStringRep());
}
if (maxWriteBufferCount != null) {
builder.field(MAX_WRITE_BUFFER_COUNT.getPreferredName(), maxWriteBufferCount);
}
if (maxWriteBufferSize != null) {
builder.field(MAX_WRITE_BUFFER_SIZE.getPreferredName(), maxWriteBufferSize.getStringRep());
}
if (maxRetryDelay != null) {
builder.field(MAX_RETRY_DELAY.getPreferredName(), maxRetryDelay.getStringRep());
}
if (readPollTimeout != null) {
builder.field(READ_POLL_TIMEOUT.getPreferredName(), readPollTimeout.getStringRep());
}
return builder;
}
public static <P extends FollowParameters> void initParser(AbstractObjectParser<P, ?> parser) {
parser.declareInt(FollowParameters::setMaxReadRequestOperationCount, MAX_READ_REQUEST_OPERATION_COUNT);
parser.declareInt(FollowParameters::setMaxWriteRequestOperationCount, MAX_WRITE_REQUEST_OPERATION_COUNT);
parser.declareInt(FollowParameters::setMaxOutstandingReadRequests, MAX_OUTSTANDING_READ_REQUESTS);
parser.declareInt(FollowParameters::setMaxOutstandingWriteRequests, MAX_OUTSTANDING_WRITE_REQUESTS);
parser.declareField(
FollowParameters::setMaxReadRequestSize,
(p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), MAX_READ_REQUEST_SIZE.getPreferredName()),
AutoFollowMetadata.AutoFollowPattern.MAX_READ_REQUEST_SIZE,
ObjectParser.ValueType.STRING);
parser.declareField(
FollowParameters::setMaxWriteRequestSize,
(p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), MAX_WRITE_REQUEST_SIZE.getPreferredName()),
AutoFollowMetadata.AutoFollowPattern.MAX_WRITE_REQUEST_SIZE,
ObjectParser.ValueType.STRING);
parser.declareInt(FollowParameters::setMaxWriteBufferCount, MAX_WRITE_BUFFER_COUNT);
parser.declareField(
FollowParameters::setMaxWriteBufferSize,
(p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), MAX_WRITE_BUFFER_SIZE.getPreferredName()),
MAX_WRITE_BUFFER_SIZE,
ObjectParser.ValueType.STRING);
parser.declareField(FollowParameters::setMaxRetryDelay,
(p, c) -> TimeValue.parseTimeValue(p.text(), MAX_RETRY_DELAY.getPreferredName()),
MAX_RETRY_DELAY, ObjectParser.ValueType.STRING);
parser.declareField(FollowParameters::setReadPollTimeout,
(p, c) -> TimeValue.parseTimeValue(p.text(), READ_POLL_TIMEOUT.getPreferredName()),
READ_POLL_TIMEOUT, ObjectParser.ValueType.STRING);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o instanceof FollowParameters == false) return false;
FollowParameters that = (FollowParameters) o;
return Objects.equals(maxReadRequestOperationCount, that.maxReadRequestOperationCount) &&
Objects.equals(maxWriteRequestOperationCount, that.maxWriteRequestOperationCount) &&
Objects.equals(maxOutstandingReadRequests, that.maxOutstandingReadRequests) &&
Objects.equals(maxOutstandingWriteRequests, that.maxOutstandingWriteRequests) &&
Objects.equals(maxReadRequestSize, that.maxReadRequestSize) &&
Objects.equals(maxWriteRequestSize, that.maxWriteRequestSize) &&
Objects.equals(maxWriteBufferCount, that.maxWriteBufferCount) &&
Objects.equals(maxWriteBufferSize, that.maxWriteBufferSize) &&
Objects.equals(maxRetryDelay, that.maxRetryDelay) &&
Objects.equals(readPollTimeout, that.readPollTimeout);
}
@Override
public int hashCode() {
return Objects.hash(
maxReadRequestOperationCount,
maxWriteRequestOperationCount,
maxOutstandingReadRequests,
maxOutstandingWriteRequests,
maxReadRequestSize,
maxWriteRequestSize,
maxWriteBufferCount,
maxWriteBufferSize,
maxRetryDelay,
readPollTimeout
);
}
}

View File

@ -5,15 +5,14 @@
*/
package org.elasticsearch.xpack.core.ccr.action;
import org.elasticsearch.Version;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -32,6 +31,7 @@ public class PutAutoFollowPatternAction extends Action<AcknowledgedResponse> {
public static final String NAME = "cluster:admin/xpack/ccr/auto_follow_pattern/put";
public static final PutAutoFollowPatternAction INSTANCE = new PutAutoFollowPatternAction();
private static final int MAX_NAME_BYTES = 255;
private PutAutoFollowPatternAction() {
super(NAME);
@ -44,54 +44,27 @@ public class PutAutoFollowPatternAction extends Action<AcknowledgedResponse> {
public static class Request extends AcknowledgedRequest<Request> implements ToXContentObject {
private static final ObjectParser<Request, String> PARSER = new ObjectParser<>("put_auto_follow_pattern_request", Request::new);
private static final ParseField NAME_FIELD = new ParseField("name");
private static final int MAX_NAME_BYTES = 255;
// Note that Request should be the Value class here for this parser with a 'parameters' field that maps to
// PutAutoFollowPatternParameters class. But since two minor version are already released with duplicate
// follow parameters in several APIs, PutAutoFollowPatternParameters is now the Value class here.
private static final ObjectParser<PutAutoFollowPatternParameters, Void> PARSER =
new ObjectParser<>("put_auto_follow_pattern_request", PutAutoFollowPatternParameters::new);
static {
PARSER.declareString(Request::setName, NAME_FIELD);
PARSER.declareString(Request::setRemoteCluster, REMOTE_CLUSTER_FIELD);
PARSER.declareStringArray(Request::setLeaderIndexPatterns, AutoFollowPattern.LEADER_PATTERNS_FIELD);
PARSER.declareString(Request::setFollowIndexNamePattern, AutoFollowPattern.FOLLOW_PATTERN_FIELD);
PARSER.declareInt(Request::setMaxReadRequestOperationCount, AutoFollowPattern.MAX_READ_REQUEST_OPERATION_COUNT);
PARSER.declareField(
Request::setMaxReadRequestSize,
(p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), AutoFollowPattern.MAX_READ_REQUEST_SIZE.getPreferredName()),
AutoFollowPattern.MAX_READ_REQUEST_SIZE,
ObjectParser.ValueType.STRING);
PARSER.declareInt(Request::setMaxConcurrentReadBatches, AutoFollowPattern.MAX_OUTSTANDING_READ_REQUESTS);
PARSER.declareInt(Request::setMaxWriteRequestOperationCount, AutoFollowPattern.MAX_WRITE_REQUEST_OPERATION_COUNT);
PARSER.declareField(
Request::setMaxWriteRequestSize,
(p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), AutoFollowPattern.MAX_WRITE_REQUEST_SIZE.getPreferredName()),
AutoFollowPattern.MAX_WRITE_REQUEST_SIZE,
ObjectParser.ValueType.STRING);
PARSER.declareInt(Request::setMaxConcurrentWriteBatches, AutoFollowPattern.MAX_OUTSTANDING_WRITE_REQUESTS);
PARSER.declareInt(Request::setMaxWriteBufferCount, AutoFollowPattern.MAX_WRITE_BUFFER_COUNT);
PARSER.declareField(
Request::setMaxWriteBufferSize,
(p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), AutoFollowPattern.MAX_WRITE_BUFFER_SIZE.getPreferredName()),
AutoFollowPattern.MAX_WRITE_BUFFER_SIZE,
ObjectParser.ValueType.STRING);
PARSER.declareField(Request::setMaxRetryDelay,
(p, c) -> TimeValue.parseTimeValue(p.text(), AutoFollowPattern.MAX_RETRY_DELAY.getPreferredName()),
AutoFollowPattern.MAX_RETRY_DELAY, ObjectParser.ValueType.STRING);
PARSER.declareField(Request::setReadPollTimeout,
(p, c) -> TimeValue.parseTimeValue(p.text(), AutoFollowPattern.READ_POLL_TIMEOUT.getPreferredName()),
AutoFollowPattern.READ_POLL_TIMEOUT, ObjectParser.ValueType.STRING);
PARSER.declareString((params, value) -> params.remoteCluster = value, REMOTE_CLUSTER_FIELD);
PARSER.declareStringArray((params, value) -> params.leaderIndexPatterns = value, AutoFollowPattern.LEADER_PATTERNS_FIELD);
PARSER.declareString((params, value) -> params.followIndexNamePattern = value, AutoFollowPattern.FOLLOW_PATTERN_FIELD);
FollowParameters.initParser(PARSER);
}
public static Request fromXContent(XContentParser parser, String name) throws IOException {
Request request = PARSER.parse(parser, null);
if (name != null) {
if (request.name == null) {
request.name = name;
} else {
if (request.name.equals(name) == false) {
throw new IllegalArgumentException("provided name is not equal");
}
}
}
PutAutoFollowPatternParameters parameters = PARSER.parse(parser, null);
Request request = new Request();
request.setName(name);
request.setRemoteCluster(parameters.remoteCluster);
request.setLeaderIndexPatterns(parameters.leaderIndexPatterns);
request.setFollowIndexNamePattern(parameters.followIndexNamePattern);
request.setParameters(parameters);
return request;
}
@ -99,40 +72,28 @@ public class PutAutoFollowPatternAction extends Action<AcknowledgedResponse> {
private String remoteCluster;
private List<String> leaderIndexPatterns;
private String followIndexNamePattern;
private Integer maxReadRequestOperationCount;
private ByteSizeValue maxReadRequestSize;
private Integer maxConcurrentReadBatches;
private Integer maxWriteRequestOperationCount;
private ByteSizeValue maxWriteRequestSize;
private Integer maxConcurrentWriteBatches;
private Integer maxWriteBufferCount;
private ByteSizeValue maxWriteBufferSize;
private TimeValue maxRetryDelay;
private TimeValue readPollTimeout;
private FollowParameters parameters = new FollowParameters();
public Request() {
}
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
ActionRequestValidationException validationException = parameters.validate();
if (name == null) {
validationException = addValidationError("[" + NAME_FIELD.getPreferredName() + "] is missing", validationException);
validationException = addValidationError("[name] is missing", validationException);
}
if (name != null) {
if (name.contains(",")) {
validationException = addValidationError("[" + NAME_FIELD.getPreferredName() + "] name must not contain a ','",
validationException);
validationException = addValidationError("[name] name must not contain a ','", validationException);
}
if (name.startsWith("_")) {
validationException = addValidationError("[" + NAME_FIELD.getPreferredName() + "] name must not start with '_'",
validationException);
validationException = addValidationError("[name] name must not start with '_'", validationException);
}
int byteCount = name.getBytes(StandardCharsets.UTF_8).length;
if (byteCount > MAX_NAME_BYTES) {
validationException = addValidationError("[" + NAME_FIELD.getPreferredName() + "] name is too long (" +
byteCount + " > " + MAX_NAME_BYTES + ")", validationException);
validationException = addValidationError("[name] name is too long (" + byteCount + " > " + MAX_NAME_BYTES + ")",
validationException);
}
}
if (remoteCluster == null) {
@ -143,19 +104,6 @@ public class PutAutoFollowPatternAction extends Action<AcknowledgedResponse> {
validationException = addValidationError("[" + AutoFollowPattern.LEADER_PATTERNS_FIELD.getPreferredName() +
"] is missing", validationException);
}
if (maxRetryDelay != null) {
if (maxRetryDelay.millis() <= 0) {
String message = "[" + AutoFollowPattern.MAX_RETRY_DELAY.getPreferredName() + "] must be positive but was [" +
maxRetryDelay.getStringRep() + "]";
validationException = addValidationError(message, validationException);
}
if (maxRetryDelay.millis() > ResumeFollowAction.MAX_RETRY_DELAY.millis()) {
String message = "[" + AutoFollowPattern.MAX_RETRY_DELAY.getPreferredName() + "] must be less than [" +
ResumeFollowAction.MAX_RETRY_DELAY +
"] but was [" + maxRetryDelay.getStringRep() + "]";
validationException = addValidationError(message, validationException);
}
}
return validationException;
}
@ -191,84 +139,12 @@ public class PutAutoFollowPatternAction extends Action<AcknowledgedResponse> {
this.followIndexNamePattern = followIndexNamePattern;
}
public Integer getMaxReadRequestOperationCount() {
return maxReadRequestOperationCount;
public FollowParameters getParameters() {
return parameters;
}
public void setMaxReadRequestOperationCount(Integer maxReadRequestOperationCount) {
this.maxReadRequestOperationCount = maxReadRequestOperationCount;
}
public Integer getMaxConcurrentReadBatches() {
return maxConcurrentReadBatches;
}
public void setMaxConcurrentReadBatches(Integer maxConcurrentReadBatches) {
this.maxConcurrentReadBatches = maxConcurrentReadBatches;
}
public ByteSizeValue getMaxReadRequestSize() {
return maxReadRequestSize;
}
public void setMaxReadRequestSize(ByteSizeValue maxReadRequestSize) {
this.maxReadRequestSize = maxReadRequestSize;
}
public Integer getMaxWriteRequestOperationCount() {
return maxWriteRequestOperationCount;
}
public void setMaxWriteRequestOperationCount(Integer maxWriteRequestOperationCount) {
this.maxWriteRequestOperationCount = maxWriteRequestOperationCount;
}
public ByteSizeValue getMaxWriteRequestSize() {
return maxWriteRequestSize;
}
public void setMaxWriteRequestSize(ByteSizeValue maxWriteRequestSize) {
this.maxWriteRequestSize = maxWriteRequestSize;
}
public Integer getMaxConcurrentWriteBatches() {
return maxConcurrentWriteBatches;
}
public void setMaxConcurrentWriteBatches(Integer maxConcurrentWriteBatches) {
this.maxConcurrentWriteBatches = maxConcurrentWriteBatches;
}
public Integer getMaxWriteBufferCount() {
return maxWriteBufferCount;
}
public void setMaxWriteBufferCount(Integer maxWriteBufferCount) {
this.maxWriteBufferCount = maxWriteBufferCount;
}
public ByteSizeValue getMaxWriteBufferSize() {
return maxWriteBufferSize;
}
public void setMaxWriteBufferSize(ByteSizeValue maxWriteBufferSize) {
this.maxWriteBufferSize = maxWriteBufferSize;
}
public TimeValue getMaxRetryDelay() {
return maxRetryDelay;
}
public void setMaxRetryDelay(TimeValue maxRetryDelay) {
this.maxRetryDelay = maxRetryDelay;
}
public TimeValue getReadPollTimeout() {
return readPollTimeout;
}
public void setReadPollTimeout(TimeValue readPollTimeout) {
this.readPollTimeout = readPollTimeout;
public void setParameters(FollowParameters parameters) {
this.parameters = parameters;
}
public Request(StreamInput in) throws IOException {
@ -277,16 +153,21 @@ public class PutAutoFollowPatternAction extends Action<AcknowledgedResponse> {
remoteCluster = in.readString();
leaderIndexPatterns = in.readStringList();
followIndexNamePattern = in.readOptionalString();
maxReadRequestOperationCount = in.readOptionalVInt();
maxReadRequestSize = in.readOptionalWriteable(ByteSizeValue::new);
maxConcurrentReadBatches = in.readOptionalVInt();
maxWriteRequestOperationCount = in.readOptionalVInt();
maxWriteRequestSize = in.readOptionalWriteable(ByteSizeValue::new);
maxConcurrentWriteBatches = in.readOptionalVInt();
maxWriteBufferCount = in.readOptionalVInt();
maxWriteBufferSize = in.readOptionalWriteable(ByteSizeValue::new);
maxRetryDelay = in.readOptionalTimeValue();
readPollTimeout = in.readOptionalTimeValue();
if (in.getVersion().onOrAfter(Version.V_7_0_0)) {
parameters = new FollowParameters(in);
} else {
parameters = new FollowParameters();
parameters.maxReadRequestOperationCount = in.readOptionalVInt();
parameters.maxReadRequestSize = in.readOptionalWriteable(ByteSizeValue::new);
parameters.maxOutstandingReadRequests = in.readOptionalVInt();
parameters.maxWriteRequestOperationCount = in.readOptionalVInt();
parameters.maxWriteRequestSize = in.readOptionalWriteable(ByteSizeValue::new);
parameters.maxOutstandingWriteRequests = in.readOptionalVInt();
parameters.maxWriteBufferCount = in.readOptionalVInt();
parameters.maxWriteBufferSize = in.readOptionalWriteable(ByteSizeValue::new);
parameters.maxRetryDelay = in.readOptionalTimeValue();
parameters.readPollTimeout = in.readOptionalTimeValue();
}
}
@Override
@ -296,58 +177,32 @@ public class PutAutoFollowPatternAction extends Action<AcknowledgedResponse> {
out.writeString(remoteCluster);
out.writeStringCollection(leaderIndexPatterns);
out.writeOptionalString(followIndexNamePattern);
out.writeOptionalVInt(maxReadRequestOperationCount);
out.writeOptionalWriteable(maxReadRequestSize);
out.writeOptionalVInt(maxConcurrentReadBatches);
out.writeOptionalVInt(maxWriteRequestOperationCount);
out.writeOptionalWriteable(maxWriteRequestSize);
out.writeOptionalVInt(maxConcurrentWriteBatches);
out.writeOptionalVInt(maxWriteBufferCount);
out.writeOptionalWriteable(maxWriteBufferSize);
out.writeOptionalTimeValue(maxRetryDelay);
out.writeOptionalTimeValue(readPollTimeout);
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
parameters.writeTo(out);
} else {
out.writeOptionalVInt(parameters.maxReadRequestOperationCount);
out.writeOptionalWriteable(parameters.maxReadRequestSize);
out.writeOptionalVInt(parameters.maxOutstandingReadRequests);
out.writeOptionalVInt(parameters.maxWriteRequestOperationCount);
out.writeOptionalWriteable(parameters.maxWriteRequestSize);
out.writeOptionalVInt(parameters.maxOutstandingWriteRequests);
out.writeOptionalVInt(parameters.maxWriteBufferCount);
out.writeOptionalWriteable(parameters.maxWriteBufferSize);
out.writeOptionalTimeValue(parameters.maxRetryDelay);
out.writeOptionalTimeValue(parameters.readPollTimeout);
}
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
{
builder.field(NAME_FIELD.getPreferredName(), name);
builder.field(REMOTE_CLUSTER_FIELD.getPreferredName(), remoteCluster);
builder.field(AutoFollowPattern.LEADER_PATTERNS_FIELD.getPreferredName(), leaderIndexPatterns);
if (followIndexNamePattern != null) {
builder.field(AutoFollowPattern.FOLLOW_PATTERN_FIELD.getPreferredName(), followIndexNamePattern);
}
if (maxReadRequestOperationCount != null) {
builder.field(AutoFollowPattern.MAX_READ_REQUEST_OPERATION_COUNT.getPreferredName(), maxReadRequestOperationCount);
}
if (maxReadRequestSize != null) {
builder.field(AutoFollowPattern.MAX_READ_REQUEST_SIZE.getPreferredName(), maxReadRequestSize.getStringRep());
}
if (maxWriteRequestOperationCount != null) {
builder.field(AutoFollowPattern.MAX_WRITE_REQUEST_OPERATION_COUNT.getPreferredName(), maxWriteRequestOperationCount);
}
if (maxWriteRequestSize != null) {
builder.field(AutoFollowPattern.MAX_WRITE_REQUEST_SIZE.getPreferredName(), maxWriteRequestSize.getStringRep());
}
if (maxWriteBufferCount != null) {
builder.field(AutoFollowPattern.MAX_WRITE_BUFFER_COUNT.getPreferredName(), maxWriteBufferCount);
}
if (maxWriteBufferSize != null) {
builder.field(AutoFollowPattern.MAX_WRITE_BUFFER_SIZE.getPreferredName(), maxWriteBufferSize.getStringRep());
}
if (maxConcurrentReadBatches != null) {
builder.field(AutoFollowPattern.MAX_OUTSTANDING_READ_REQUESTS.getPreferredName(), maxConcurrentReadBatches);
}
if (maxConcurrentWriteBatches != null) {
builder.field(AutoFollowPattern.MAX_OUTSTANDING_WRITE_REQUESTS.getPreferredName(), maxConcurrentWriteBatches);
}
if (maxRetryDelay != null) {
builder.field(AutoFollowPattern.MAX_RETRY_DELAY.getPreferredName(), maxRetryDelay.getStringRep());
}
if (readPollTimeout != null) {
builder.field(AutoFollowPattern.READ_POLL_TIMEOUT.getPreferredName(), readPollTimeout.getStringRep());
}
parameters.toXContentFragment(builder);
}
builder.endObject();
return builder;
@ -359,39 +214,25 @@ public class PutAutoFollowPatternAction extends Action<AcknowledgedResponse> {
if (o == null || getClass() != o.getClass()) return false;
Request request = (Request) o;
return Objects.equals(name, request.name) &&
Objects.equals(remoteCluster, request.remoteCluster) &&
Objects.equals(leaderIndexPatterns, request.leaderIndexPatterns) &&
Objects.equals(followIndexNamePattern, request.followIndexNamePattern) &&
Objects.equals(maxReadRequestOperationCount, request.maxReadRequestOperationCount) &&
Objects.equals(maxReadRequestSize, request.maxReadRequestSize) &&
Objects.equals(maxConcurrentReadBatches, request.maxConcurrentReadBatches) &&
Objects.equals(maxWriteRequestOperationCount, request.maxWriteRequestOperationCount) &&
Objects.equals(maxWriteRequestSize, request.maxWriteRequestSize) &&
Objects.equals(maxConcurrentWriteBatches, request.maxConcurrentWriteBatches) &&
Objects.equals(maxWriteBufferCount, request.maxWriteBufferCount) &&
Objects.equals(maxWriteBufferSize, request.maxWriteBufferSize) &&
Objects.equals(maxRetryDelay, request.maxRetryDelay) &&
Objects.equals(readPollTimeout, request.readPollTimeout);
Objects.equals(remoteCluster, request.remoteCluster) &&
Objects.equals(leaderIndexPatterns, request.leaderIndexPatterns) &&
Objects.equals(followIndexNamePattern, request.followIndexNamePattern) &&
Objects.equals(parameters, request.parameters);
}
@Override
public int hashCode() {
return Objects.hash(
name,
remoteCluster,
leaderIndexPatterns,
followIndexNamePattern,
maxReadRequestOperationCount,
maxReadRequestSize,
maxConcurrentReadBatches,
maxWriteRequestOperationCount,
maxWriteRequestSize,
maxConcurrentWriteBatches,
maxWriteBufferCount,
maxWriteBufferSize,
maxRetryDelay,
readPollTimeout);
return Objects.hash(name, remoteCluster, leaderIndexPatterns, followIndexNamePattern, parameters);
}
// This class only exists for reuse of the FollowParameters class, see comment above the parser field.
private static class PutAutoFollowPatternParameters extends FollowParameters {
private String remoteCluster;
private List<String> leaderIndexPatterns;
private String followIndexNamePattern;
}
}
}

View File

@ -18,8 +18,6 @@ import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -29,17 +27,6 @@ import java.io.IOException;
import java.util.Objects;
import static org.elasticsearch.action.ValidateActions.addValidationError;
import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.FOLLOWER_INDEX_FIELD;
import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_OUTSTANDING_READ_REQUESTS;
import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_OUTSTANDING_WRITE_REQUESTS;
import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_READ_REQUEST_OPERATION_COUNT;
import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_READ_REQUEST_SIZE;
import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_RETRY_DELAY_FIELD;
import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_WRITE_BUFFER_COUNT;
import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_WRITE_BUFFER_SIZE;
import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_WRITE_REQUEST_OPERATION_COUNT;
import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_WRITE_REQUEST_SIZE;
import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.READ_POLL_TIMEOUT;
public final class PutFollowAction extends Action<PutFollowAction.Response> {
@ -65,72 +52,47 @@ public final class PutFollowAction extends Action<PutFollowAction.Response> {
private static final ParseField REMOTE_CLUSTER_FIELD = new ParseField("remote_cluster");
private static final ParseField LEADER_INDEX_FIELD = new ParseField("leader_index");
private static final ObjectParser<Request, String> PARSER = new ObjectParser<>(NAME, () -> {
Request request = new Request();
request.setFollowRequest(new ResumeFollowAction.Request());
return request;
});
// Note that Request should be the Value class here for this parser with a 'parameters' field that maps to
// PutFollowParameters class. But since two minor version are already released with duplicate follow parameters
// in several APIs, PutFollowParameters is now the Value class here.
private static final ObjectParser<PutFollowParameters, Void> PARSER = new ObjectParser<>(NAME, PutFollowParameters::new);
static {
PARSER.declareString(Request::setRemoteCluster, REMOTE_CLUSTER_FIELD);
PARSER.declareString(Request::setLeaderIndex, LEADER_INDEX_FIELD);
PARSER.declareString((req, val) -> req.followRequest.setFollowerIndex(val), FOLLOWER_INDEX_FIELD);
PARSER.declareInt((req, val) -> req.followRequest.setMaxReadRequestOperationCount(val), MAX_READ_REQUEST_OPERATION_COUNT);
PARSER.declareField(
(req, val) -> req.followRequest.setMaxReadRequestSize(val),
(p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), MAX_READ_REQUEST_SIZE.getPreferredName()),
MAX_READ_REQUEST_SIZE,
ObjectParser.ValueType.STRING);
PARSER.declareInt((req, val) -> req.followRequest.setMaxOutstandingReadRequests(val), MAX_OUTSTANDING_READ_REQUESTS);
PARSER.declareInt((req, val) -> req.followRequest.setMaxWriteRequestOperationCount(val), MAX_WRITE_REQUEST_OPERATION_COUNT);
PARSER.declareField(
(req, val) -> req.followRequest.setMaxWriteRequestSize(val),
(p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), MAX_WRITE_REQUEST_SIZE.getPreferredName()),
MAX_WRITE_REQUEST_SIZE,
ObjectParser.ValueType.STRING);
PARSER.declareInt((req, val) -> req.followRequest.setMaxOutstandingWriteRequests(val), MAX_OUTSTANDING_WRITE_REQUESTS);
PARSER.declareInt((req, val) -> req.followRequest.setMaxWriteBufferCount(val), MAX_WRITE_BUFFER_COUNT);
PARSER.declareField(
(req, val) -> req.followRequest.setMaxWriteBufferSize(val),
(p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), MAX_WRITE_BUFFER_SIZE.getPreferredName()),
MAX_WRITE_BUFFER_SIZE,
ObjectParser.ValueType.STRING);
PARSER.declareField(
(req, val) -> req.followRequest.setMaxRetryDelay(val),
(p, c) -> TimeValue.parseTimeValue(p.text(), MAX_RETRY_DELAY_FIELD.getPreferredName()),
MAX_RETRY_DELAY_FIELD,
ObjectParser.ValueType.STRING);
PARSER.declareField(
(req, val) -> req.followRequest.setReadPollTimeout(val),
(p, c) -> TimeValue.parseTimeValue(p.text(), READ_POLL_TIMEOUT.getPreferredName()),
READ_POLL_TIMEOUT,
ObjectParser.ValueType.STRING);
PARSER.declareString((putFollowParameters, value) -> putFollowParameters.remoteCluster = value, REMOTE_CLUSTER_FIELD);
PARSER.declareString((putFollowParameters, value) -> putFollowParameters.leaderIndex = value, LEADER_INDEX_FIELD);
FollowParameters.initParser(PARSER);
}
public static Request fromXContent(final XContentParser parser, final String followerIndex, ActiveShardCount waitForActiveShards)
throws IOException {
Request request = PARSER.parse(parser, followerIndex);
if (followerIndex != null) {
if (request.getFollowRequest().getFollowerIndex() == null) {
request.getFollowRequest().setFollowerIndex(followerIndex);
} else {
if (request.getFollowRequest().getFollowerIndex().equals(followerIndex) == false) {
throw new IllegalArgumentException("provided follower_index is not equal");
}
}
}
PutFollowParameters parameters = PARSER.parse(parser, null);
Request request = new Request();
request.waitForActiveShards(waitForActiveShards);
request.setFollowerIndex(followerIndex);
request.setRemoteCluster(parameters.remoteCluster);
request.setLeaderIndex(parameters.leaderIndex);
request.setParameters(parameters);
return request;
}
private String remoteCluster;
private String leaderIndex;
private String followerIndex;
private FollowParameters parameters = new FollowParameters();
private ActiveShardCount waitForActiveShards = ActiveShardCount.NONE;
private ResumeFollowAction.Request followRequest;
public Request() {
}
public String getFollowerIndex() {
return followerIndex;
}
public void setFollowerIndex(String followerIndex) {
this.followerIndex = followerIndex;
}
public String getRemoteCluster() {
return remoteCluster;
}
@ -147,6 +109,14 @@ public final class PutFollowAction extends Action<PutFollowAction.Response> {
this.leaderIndex = leaderIndex;
}
public FollowParameters getParameters() {
return parameters;
}
public void setParameters(FollowParameters parameters) {
this.parameters = parameters;
}
public ActiveShardCount waitForActiveShards() {
return waitForActiveShards;
}
@ -168,29 +138,24 @@ public final class PutFollowAction extends Action<PutFollowAction.Response> {
}
}
public ResumeFollowAction.Request getFollowRequest() {
return followRequest;
}
public void setFollowRequest(ResumeFollowAction.Request followRequest) {
this.followRequest = followRequest;
}
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException e = followRequest.validate();
ActionRequestValidationException e = parameters.validate();
if (remoteCluster == null) {
e = addValidationError(REMOTE_CLUSTER_FIELD.getPreferredName() + " is missing", e);
}
if (leaderIndex == null) {
e = addValidationError(LEADER_INDEX_FIELD.getPreferredName() + " is missing", e);
}
if (followerIndex == null) {
e = addValidationError("follower_index is missing", e);
}
return e;
}
@Override
public String[] indices() {
return new String[]{followRequest.getFollowerIndex()};
return new String[]{followerIndex};
}
@Override
@ -200,12 +165,13 @@ public final class PutFollowAction extends Action<PutFollowAction.Response> {
public Request(StreamInput in) throws IOException {
super(in);
remoteCluster = in.readString();
leaderIndex = in.readString();
this.remoteCluster = in.readString();
this.leaderIndex = in.readString();
this.followerIndex = in.readString();
this.parameters = new FollowParameters(in);
if (in.getVersion().onOrAfter(Version.V_6_7_0)) {
waitForActiveShards(ActiveShardCount.readFrom(in));
}
followRequest = new ResumeFollowAction.Request(in);
}
@Override
@ -213,10 +179,11 @@ public final class PutFollowAction extends Action<PutFollowAction.Response> {
super.writeTo(out);
out.writeString(remoteCluster);
out.writeString(leaderIndex);
out.writeString(followerIndex);
parameters.writeTo(out);
if (out.getVersion().onOrAfter(Version.V_6_7_0)) {
waitForActiveShards.writeTo(out);
}
followRequest.writeTo(out);
}
@Override
@ -225,7 +192,7 @@ public final class PutFollowAction extends Action<PutFollowAction.Response> {
{
builder.field(REMOTE_CLUSTER_FIELD.getPreferredName(), remoteCluster);
builder.field(LEADER_INDEX_FIELD.getPreferredName(), leaderIndex);
followRequest.toXContentFragment(builder, params);
parameters.toXContentFragment(builder);
}
builder.endObject();
return builder;
@ -238,24 +205,23 @@ public final class PutFollowAction extends Action<PutFollowAction.Response> {
Request request = (Request) o;
return Objects.equals(remoteCluster, request.remoteCluster) &&
Objects.equals(leaderIndex, request.leaderIndex) &&
Objects.equals(waitForActiveShards, request.waitForActiveShards) &&
Objects.equals(followRequest, request.followRequest);
Objects.equals(followerIndex, request.followerIndex) &&
Objects.equals(parameters, request.parameters) &&
Objects.equals(waitForActiveShards, request.waitForActiveShards);
}
@Override
public int hashCode() {
return Objects.hash(remoteCluster, leaderIndex, waitForActiveShards, followRequest);
return Objects.hash(remoteCluster, leaderIndex, followerIndex, parameters, waitForActiveShards);
}
@Override
public String toString() {
return "PutFollowAction.Request{" +
"remoteCluster='" + remoteCluster + '\'' +
", leaderIndex='" + leaderIndex + '\'' +
", waitForActiveShards=" + waitForActiveShards +
", followRequest=" + followRequest +
'}';
// This class only exists for reuse of the FollowParameters class, see comment above the parser field.
private static class PutFollowParameters extends FollowParameters {
private String remoteCluster;
private String leaderIndex;
}
}
public static class Response extends ActionResponse implements ToXContentObject {

View File

@ -10,11 +10,8 @@ import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -30,8 +27,6 @@ public final class ResumeFollowAction extends Action<AcknowledgedResponse> {
public static final ResumeFollowAction INSTANCE = new ResumeFollowAction();
public static final String NAME = "cluster:admin/xpack/ccr/resume_follow";
public static final TimeValue MAX_RETRY_DELAY = TimeValue.timeValueMinutes(5);
private ResumeFollowAction() {
super(NAME);
}
@ -43,65 +38,28 @@ public final class ResumeFollowAction extends Action<AcknowledgedResponse> {
public static class Request extends MasterNodeRequest<Request> implements ToXContentObject {
static final ParseField FOLLOWER_INDEX_FIELD = new ParseField("follower_index");
static final ParseField MAX_READ_REQUEST_OPERATION_COUNT = new ParseField("max_read_request_operation_count");
static final ParseField MAX_READ_REQUEST_SIZE = new ParseField("max_read_request_size");
static final ParseField MAX_OUTSTANDING_READ_REQUESTS = new ParseField("max_outstanding_read_requests");
static final ParseField MAX_WRITE_REQUEST_OPERATION_COUNT = new ParseField("max_write_request_operation_count");
static final ParseField MAX_WRITE_REQUEST_SIZE = new ParseField("max_write_request_size");
static final ParseField MAX_OUTSTANDING_WRITE_REQUESTS = new ParseField("max_outstanding_write_requests");
static final ParseField MAX_WRITE_BUFFER_COUNT = new ParseField("max_write_buffer_count");
static final ParseField MAX_WRITE_BUFFER_SIZE = new ParseField("max_write_buffer_size");
static final ParseField MAX_RETRY_DELAY_FIELD = new ParseField("max_retry_delay");
static final ParseField READ_POLL_TIMEOUT = new ParseField("read_poll_timeout");
static final ObjectParser<Request, String> PARSER = new ObjectParser<>(NAME, Request::new);
// Note that Request should be the Value class here for this parser with a 'parameters' field that maps to FollowParameters class
// But since two minor version are already released with duplicate follow parameters in several APIs, FollowParameters
// is now the Value class here.
static final ObjectParser<FollowParameters, Void> PARSER = new ObjectParser<>(NAME, FollowParameters::new);
static {
PARSER.declareString(Request::setFollowerIndex, FOLLOWER_INDEX_FIELD);
PARSER.declareInt(Request::setMaxReadRequestOperationCount, MAX_READ_REQUEST_OPERATION_COUNT);
PARSER.declareField(
Request::setMaxReadRequestSize,
(p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), MAX_READ_REQUEST_SIZE.getPreferredName()), MAX_READ_REQUEST_SIZE,
ObjectParser.ValueType.STRING);
PARSER.declareInt(Request::setMaxOutstandingReadRequests, MAX_OUTSTANDING_READ_REQUESTS);
PARSER.declareInt(Request::setMaxWriteRequestOperationCount, MAX_WRITE_REQUEST_OPERATION_COUNT);
PARSER.declareField(Request::setMaxWriteRequestSize,
(p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), MAX_WRITE_REQUEST_SIZE.getPreferredName()), MAX_WRITE_REQUEST_SIZE,
ObjectParser.ValueType.STRING);
PARSER.declareInt(Request::setMaxOutstandingWriteRequests, MAX_OUTSTANDING_WRITE_REQUESTS);
PARSER.declareInt(Request::setMaxWriteBufferCount, MAX_WRITE_BUFFER_COUNT);
PARSER.declareField(
Request::setMaxWriteBufferSize,
(p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), MAX_WRITE_BUFFER_SIZE.getPreferredName()),
MAX_WRITE_BUFFER_SIZE,
ObjectParser.ValueType.STRING);
PARSER.declareField(
Request::setMaxRetryDelay,
(p, c) -> TimeValue.parseTimeValue(p.text(), MAX_RETRY_DELAY_FIELD.getPreferredName()),
MAX_RETRY_DELAY_FIELD,
ObjectParser.ValueType.STRING);
PARSER.declareField(
Request::setReadPollTimeout,
(p, c) -> TimeValue.parseTimeValue(p.text(), READ_POLL_TIMEOUT.getPreferredName()),
READ_POLL_TIMEOUT,
ObjectParser.ValueType.STRING);
FollowParameters.initParser(PARSER);
}
public static Request fromXContent(final XContentParser parser, final String followerIndex) throws IOException {
Request request = PARSER.parse(parser, followerIndex);
if (followerIndex != null) {
if (request.followerIndex == null) {
request.followerIndex = followerIndex;
} else {
if (request.followerIndex.equals(followerIndex) == false) {
throw new IllegalArgumentException("provided follower_index is not equal");
}
}
}
FollowParameters parameters = PARSER.parse(parser, null);
Request request = new Request();
request.setFollowerIndex(followerIndex);
request.setParameters(parameters);
return request;
}
private String followerIndex;
private FollowParameters parameters = new FollowParameters();
public Request() {
}
public String getFollowerIndex() {
return followerIndex;
@ -111,261 +69,58 @@ public final class ResumeFollowAction extends Action<AcknowledgedResponse> {
this.followerIndex = followerIndex;
}
private Integer maxReadRequestOperationCount;
public Integer getMaxReadRequestOperationCount() {
return maxReadRequestOperationCount;
public FollowParameters getParameters() {
return parameters;
}
public void setMaxReadRequestOperationCount(Integer maxReadRequestOperationCount) {
this.maxReadRequestOperationCount = maxReadRequestOperationCount;
}
private Integer maxOutstandingReadRequests;
public Integer getMaxOutstandingReadRequests() {
return maxOutstandingReadRequests;
}
public void setMaxOutstandingReadRequests(Integer maxOutstandingReadRequests) {
this.maxOutstandingReadRequests = maxOutstandingReadRequests;
}
private ByteSizeValue maxReadRequestSize;
public ByteSizeValue getMaxReadRequestSize() {
return maxReadRequestSize;
}
public void setMaxReadRequestSize(ByteSizeValue maxReadRequestSize) {
this.maxReadRequestSize = maxReadRequestSize;
}
private Integer maxWriteRequestOperationCount;
public Integer getMaxWriteRequestOperationCount() {
return maxWriteRequestOperationCount;
}
public void setMaxWriteRequestOperationCount(Integer maxWriteRequestOperationCount) {
this.maxWriteRequestOperationCount = maxWriteRequestOperationCount;
}
private ByteSizeValue maxWriteRequestSize;
public ByteSizeValue getMaxWriteRequestSize() {
return maxWriteRequestSize;
}
public void setMaxWriteRequestSize(ByteSizeValue maxWriteRequestSize) {
this.maxWriteRequestSize = maxWriteRequestSize;
}
private Integer maxOutstandingWriteRequests;
public Integer getMaxOutstandingWriteRequests() {
return maxOutstandingWriteRequests;
}
public void setMaxOutstandingWriteRequests(Integer maxOutstandingWriteRequests) {
this.maxOutstandingWriteRequests = maxOutstandingWriteRequests;
}
private Integer maxWriteBufferCount;
public Integer getMaxWriteBufferCount() {
return maxWriteBufferCount;
}
public void setMaxWriteBufferCount(Integer maxWriteBufferCount) {
this.maxWriteBufferCount = maxWriteBufferCount;
}
private ByteSizeValue maxWriteBufferSize;
public ByteSizeValue getMaxWriteBufferSize() {
return maxWriteBufferSize;
}
public void setMaxWriteBufferSize(ByteSizeValue maxWriteBufferSize) {
this.maxWriteBufferSize = maxWriteBufferSize;
}
private TimeValue maxRetryDelay;
public void setMaxRetryDelay(TimeValue maxRetryDelay) {
this.maxRetryDelay = maxRetryDelay;
}
public TimeValue getMaxRetryDelay() {
return maxRetryDelay;
}
private TimeValue readPollTimeout;
public TimeValue getReadPollTimeout() {
return readPollTimeout;
}
public void setReadPollTimeout(TimeValue readPollTimeout) {
this.readPollTimeout = readPollTimeout;
}
public Request() {
public void setParameters(FollowParameters parameters) {
this.parameters = parameters;
}
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException e = null;
ActionRequestValidationException e = parameters.validate();
if (followerIndex == null) {
e = addValidationError(FOLLOWER_INDEX_FIELD.getPreferredName() + " is missing", e);
e = addValidationError("follower_index is missing", e);
}
if (maxReadRequestOperationCount != null && maxReadRequestOperationCount < 1) {
e = addValidationError(MAX_READ_REQUEST_OPERATION_COUNT.getPreferredName() + " must be larger than 0", e);
}
if (maxReadRequestSize != null && maxReadRequestSize.compareTo(ByteSizeValue.ZERO) <= 0) {
e = addValidationError(MAX_READ_REQUEST_SIZE.getPreferredName() + " must be larger than 0", e);
}
if (maxOutstandingReadRequests != null && maxOutstandingReadRequests < 1) {
e = addValidationError(MAX_OUTSTANDING_READ_REQUESTS.getPreferredName() + " must be larger than 0", e);
}
if (maxWriteRequestOperationCount != null && maxWriteRequestOperationCount < 1) {
e = addValidationError(MAX_WRITE_REQUEST_OPERATION_COUNT.getPreferredName() + " must be larger than 0", e);
}
if (maxWriteRequestSize != null && maxWriteRequestSize.compareTo(ByteSizeValue.ZERO) <= 0) {
e = addValidationError(MAX_WRITE_REQUEST_SIZE.getPreferredName() + " must be larger than 0", e);
}
if (maxOutstandingWriteRequests != null && maxOutstandingWriteRequests < 1) {
e = addValidationError(MAX_OUTSTANDING_WRITE_REQUESTS.getPreferredName() + " must be larger than 0", e);
}
if (maxWriteBufferCount != null && maxWriteBufferCount < 1) {
e = addValidationError(MAX_WRITE_BUFFER_COUNT.getPreferredName() + " must be larger than 0", e);
}
if (maxWriteBufferSize != null && maxWriteBufferSize.compareTo(ByteSizeValue.ZERO) <= 0) {
e = addValidationError(MAX_WRITE_BUFFER_SIZE.getPreferredName() + " must be larger than 0", e);
}
if (maxRetryDelay != null && maxRetryDelay.millis() <= 0) {
String message = "[" + MAX_RETRY_DELAY_FIELD.getPreferredName() + "] must be positive but was [" +
maxRetryDelay.getStringRep() + "]";
e = addValidationError(message, e);
}
if (maxRetryDelay != null && maxRetryDelay.millis() > ResumeFollowAction.MAX_RETRY_DELAY.millis()) {
String message = "[" + MAX_RETRY_DELAY_FIELD.getPreferredName() + "] must be less than [" + MAX_RETRY_DELAY +
"] but was [" + maxRetryDelay.getStringRep() + "]";
e = addValidationError(message, e);
}
return e;
}
public Request(StreamInput in) throws IOException {
super(in);
followerIndex = in.readString();
maxReadRequestOperationCount = in.readOptionalVInt();
maxOutstandingReadRequests = in.readOptionalVInt();
maxReadRequestSize = in.readOptionalWriteable(ByteSizeValue::new);
maxWriteRequestOperationCount = in.readOptionalVInt();
maxWriteRequestSize = in.readOptionalWriteable(ByteSizeValue::new);
maxOutstandingWriteRequests = in.readOptionalVInt();
maxWriteBufferCount = in.readOptionalVInt();
maxWriteBufferSize = in.readOptionalWriteable(ByteSizeValue::new);
maxRetryDelay = in.readOptionalTimeValue();
readPollTimeout = in.readOptionalTimeValue();
parameters = new FollowParameters(in);
}
@Override
public void writeTo(final StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(followerIndex);
out.writeOptionalVInt(maxReadRequestOperationCount);
out.writeOptionalVInt(maxOutstandingReadRequests);
out.writeOptionalWriteable(maxReadRequestSize);
out.writeOptionalVInt(maxWriteRequestOperationCount);
out.writeOptionalWriteable(maxWriteRequestSize);
out.writeOptionalVInt(maxOutstandingWriteRequests);
out.writeOptionalVInt(maxWriteBufferCount);
out.writeOptionalWriteable(maxWriteBufferSize);
out.writeOptionalTimeValue(maxRetryDelay);
out.writeOptionalTimeValue(readPollTimeout);
parameters.writeTo(out);
}
@Override
public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException {
builder.startObject();
{
toXContentFragment(builder, params);
parameters.toXContentFragment(builder);
}
builder.endObject();
return builder;
}
void toXContentFragment(final XContentBuilder builder, final Params params) throws IOException {
builder.field(FOLLOWER_INDEX_FIELD.getPreferredName(), followerIndex);
if (maxReadRequestOperationCount != null) {
builder.field(MAX_READ_REQUEST_OPERATION_COUNT.getPreferredName(), maxReadRequestOperationCount);
}
if (maxReadRequestSize != null) {
builder.field(MAX_READ_REQUEST_SIZE.getPreferredName(), maxReadRequestSize.getStringRep());
}
if (maxWriteRequestOperationCount != null) {
builder.field(MAX_WRITE_REQUEST_OPERATION_COUNT.getPreferredName(), maxWriteRequestOperationCount);
}
if (maxWriteRequestSize != null) {
builder.field(MAX_WRITE_REQUEST_SIZE.getPreferredName(), maxWriteRequestSize.getStringRep());
}
if (maxWriteBufferCount != null) {
builder.field(MAX_WRITE_BUFFER_COUNT.getPreferredName(), maxWriteBufferCount);
}
if (maxWriteBufferSize != null) {
builder.field(MAX_WRITE_BUFFER_SIZE.getPreferredName(), maxWriteBufferSize.getStringRep());
}
if (maxOutstandingReadRequests != null) {
builder.field(MAX_OUTSTANDING_READ_REQUESTS.getPreferredName(), maxOutstandingReadRequests);
}
if (maxOutstandingWriteRequests != null) {
builder.field(MAX_OUTSTANDING_WRITE_REQUESTS.getPreferredName(), maxOutstandingWriteRequests);
}
if (maxRetryDelay != null) {
builder.field(MAX_RETRY_DELAY_FIELD.getPreferredName(), maxRetryDelay.getStringRep());
}
if (readPollTimeout != null) {
builder.field(READ_POLL_TIMEOUT.getPreferredName(), readPollTimeout.getStringRep());
}
}
@Override
public boolean equals(final Object o) {
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Request request = (Request) o;
return Objects.equals(maxReadRequestOperationCount, request.maxReadRequestOperationCount) &&
Objects.equals(maxReadRequestSize, request.maxReadRequestSize) &&
Objects.equals(maxOutstandingReadRequests, request.maxOutstandingReadRequests) &&
Objects.equals(maxWriteRequestOperationCount, request.maxWriteRequestOperationCount) &&
Objects.equals(maxWriteRequestSize, request.maxWriteRequestSize) &&
Objects.equals(maxOutstandingWriteRequests, request.maxOutstandingWriteRequests) &&
Objects.equals(maxWriteBufferCount, request.maxWriteBufferCount) &&
Objects.equals(maxWriteBufferSize, request.maxWriteBufferSize) &&
Objects.equals(maxRetryDelay, request.maxRetryDelay) &&
Objects.equals(readPollTimeout, request.readPollTimeout) &&
Objects.equals(followerIndex, request.followerIndex);
return Objects.equals(followerIndex, request.followerIndex) &&
Objects.equals(parameters, request.parameters);
}
@Override
public int hashCode() {
return Objects.hash(
followerIndex,
maxReadRequestOperationCount,
maxReadRequestSize,
maxOutstandingReadRequests,
maxWriteRequestOperationCount,
maxWriteRequestSize,
maxOutstandingWriteRequests,
maxWriteBufferCount,
maxWriteBufferSize,
maxRetryDelay,
readPollTimeout);
return Objects.hash(followerIndex, parameters);
}
}