Add ccr follow info api (#37408)

* Add ccr follow info api

This api returns all follower indices and per follower index
the provided parameters at put follow / resume follow time and
whether index following is paused or active.

Closes #37127

* iter

* [DOCS] Edits the get follower info API

* [DOCS] Fixes link to remote cluster

* [DOCS] Clarifies descriptions for configured parameters
This commit is contained in:
Martijn van Groningen 2019-01-18 16:37:21 +01:00 committed by GitHub
parent 377d96e376
commit 6846666b6b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 1048 additions and 19 deletions

View File

@ -22,6 +22,7 @@ You can use the following APIs to perform {ccr} operations.
* <<ccr-post-resume-follow,Resume follower>>
* <<ccr-post-unfollow,Convert follower index to a regular index>>
* <<ccr-get-follow-stats,Get stats about follower indices>>
* <<ccr-get-follow-info,Get info about follower indices>>
[float]
[[ccr-api-auto-follow]]
@ -40,6 +41,7 @@ include::follow/post-pause-follow.asciidoc[]
include::follow/post-resume-follow.asciidoc[]
include::follow/post-unfollow.asciidoc[]
include::follow/get-follow-stats.asciidoc[]
include::follow/get-follow-info.asciidoc[]
// auto-follow
include::auto-follow/put-auto-follow-pattern.asciidoc[]

View File

@ -0,0 +1,169 @@
[role="xpack"]
[testenv="platinum"]
[[ccr-get-follow-info]]
=== Get follower info API
++++
<titleabbrev>Get follower info</titleabbrev>
++++
beta[]
Retrieves information about all follower indices.
==== Description
This API lists the parameters and the status for each follower index.
For example, the results include follower index names, leader index names,
replication options and whether the follower indices are active or paused.
==== Request
//////////////////////////
[source,js]
--------------------------------------------------
PUT /follower_index/_ccr/follow
{
"remote_cluster" : "remote_cluster",
"leader_index" : "leader_index"
}
--------------------------------------------------
// CONSOLE
// TESTSETUP
// TEST[setup:remote_cluster_and_leader_index]
[source,js]
--------------------------------------------------
POST /follower_index/_ccr/pause_follow
--------------------------------------------------
// CONSOLE
// TEARDOWN
//////////////////////////
[source,js]
--------------------------------------------------
GET /<index>/_ccr/info
--------------------------------------------------
// CONSOLE
// TEST[s/<index>/follower_index/]
==== Path Parameters
`index` ::
(string) A comma-delimited list of follower index patterns
==== Results
This API returns the following information:
`follower_indices`::
(array) An array of follower index statistics
The `indices` array consists of objects containing several fields:
`indices[].follower_index`::
(string) The name of the follower index
`indices[].remote_cluster`::
(string) The <<modules-remote-clusters,remote cluster>> that contains the
leader index
`indices[].leader_index`::
(string) The name of the index in the leader cluster that is followed
`indices[].status`::
(string) Whether index following is `active` or `paused`
`indices[].parameters`::
(object) An object that encapsulates {ccr} parameters
The `parameters` contains the following fields:
`indices[].parameters.max_read_request_operation_count`::
(integer) The maximum number of operations to pull per read from the remote
cluster
`indices[].parameters.max_outstanding_read_requests`::
(long) The maximum number of outstanding read requests from the remote cluster
`indices[].parameters.max_read_request_size`::
(<<byte-units,byte value>>) The maximum size in bytes of per read of a batch
of operations pulled from the remote cluster
`indices[].parameters.max_write_request_operation_count`::
(integer) The maximum number of operations per bulk write request executed on
the follower
`indices[].parameters.max_write_request_size`::
(<<byte-units,byte value>>) The maximum total bytes of operations per bulk
write request executed on the follower
`indices[].parameters.max_outstanding_write_requests`::
(integer) The maximum number of outstanding write requests on the follower
`indices[].parameters.max_write_buffer_count`::
(integer) The maximum number of operations that can be queued for writing.
When this limit is reached, reads from the remote cluster are deferred until
the number of queued operations goes below the limit
`indices[].parameters.max_write_buffer_size`::
(<<byte-units,byte value>>) The maximum total bytes of operations that can be
queued for writing. When this limit is reached, reads from the remote cluster
are deferred until the total bytes of queued operations goes below the limit
`indices[].parameters.max_retry_delay`::
(<<time-units,time value>>) The maximum time to wait before retrying an
operation that failed exceptionally. An exponential backoff strategy is
employed when retrying
`indices[].parameters.read_poll_timeout`::
(<<time-units,time value>>) The maximum time to wait for new operations on the
remote cluster when the follower index is synchronized with the leader index.
When the timeout has elapsed, the poll for operations returns to the follower
so that it can update some statistics, then the follower immediately attempts
to read from the leader again
==== Authorization
If the {es} {security-features} are enabled, you must have `monitor` cluster
privileges. For more information, see
{stack-ov}/security-privileges.html[Security privileges].
==== Example
This example retrieves follower info:
[source,js]
--------------------------------------------------
GET /follower_index/_ccr/info
--------------------------------------------------
// CONSOLE
The API returns the following results:
[source,js]
--------------------------------------------------
{
"follower_indices" : [
{
"follower_index" : "follower_index",
"remote_cluster" : "remote_cluster",
"leader_index" : "leader_index",
"status" : "active",
"parameters" : {
"max_read_request_operation_count" : 5120,
"max_read_request_size" : "32mb",
"max_outstanding_read_requests" : 12,
"max_write_request_operation_count" : 5120,
"max_write_request_size" : "9223372036854775807b",
"max_outstanding_write_requests" : 9,
"max_write_buffer_count" : 2147483647,
"max_write_buffer_size" : "512mb",
"max_retry_delay" : "500ms",
"read_poll_timeout" : "1m"
}
}
]
}
--------------------------------------------------
// TESTRESPONSE

View File

@ -0,0 +1,75 @@
---
"Test info":
- do:
cluster.state: {}
- set: {master_node: master}
- do:
nodes.info: {}
- set: {nodes.$master.transport_address: local_ip}
- do:
cluster.put_settings:
body:
transient:
cluster.remote.local.seeds: $local_ip
flat_settings: true
- match: {transient: {cluster.remote.local.seeds: $local_ip}}
- do:
indices.create:
index: foo
body:
settings:
index:
number_of_shards: 1
number_of_replicas: 0
soft_deletes:
enabled: true
- do:
ccr.follow:
index: bar
body:
remote_cluster: local
leader_index: foo
- is_true: follow_index_created
- is_true: follow_index_shards_acked
- is_true: index_following_started
- do:
ccr.follow_info:
index: bar
- length: {follower_indices: 1}
- match: {follower_indices.0.follower_index: "bar"}
- match: {follower_indices.0.remote_cluster: "local"}
- match: {follower_indices.0.leader_index: "foo"}
- match: {follower_indices.0.status: "active"}
- match: {follower_indices.0.parameters.max_read_request_operation_count: 5120}
- match: {follower_indices.0.parameters.max_read_request_size: "32mb"}
- match: {follower_indices.0.parameters.max_outstanding_read_requests: 12}
- match: {follower_indices.0.parameters.max_write_request_operation_count: 5120}
- match: {follower_indices.0.parameters.max_write_request_size: "9223372036854775807b"}
- match: {follower_indices.0.parameters.max_outstanding_write_requests: 9}
- match: {follower_indices.0.parameters.max_write_buffer_count: 2147483647,}
- match: {follower_indices.0.parameters.max_write_buffer_size: "512mb"}
- match: {follower_indices.0.parameters.max_retry_delay: "500ms"}
- match: {follower_indices.0.parameters.read_poll_timeout: "1m"}
- do:
ccr.pause_follow:
index: bar
- is_true: acknowledged
- do:
ccr.follow_info:
index: bar
- length: {follower_indices: 1}
- match: {follower_indices.0.follower_index: "bar"}
- match: {follower_indices.0.remote_cluster: "local"}
- match: {follower_indices.0.leader_index: "foo"}
- match: {follower_indices.0.status: "paused"}
- is_false: follower_indices.0.parameters

View File

@ -50,6 +50,7 @@ import org.elasticsearch.xpack.ccr.action.ShardFollowTask;
import org.elasticsearch.xpack.ccr.action.ShardFollowTasksExecutor;
import org.elasticsearch.xpack.ccr.action.TransportCcrStatsAction;
import org.elasticsearch.xpack.ccr.action.TransportDeleteAutoFollowPatternAction;
import org.elasticsearch.xpack.ccr.action.TransportFollowInfoAction;
import org.elasticsearch.xpack.ccr.action.TransportFollowStatsAction;
import org.elasticsearch.xpack.ccr.action.TransportGetAutoFollowPatternAction;
import org.elasticsearch.xpack.ccr.action.TransportPauseFollowAction;
@ -69,6 +70,7 @@ import org.elasticsearch.xpack.ccr.repository.CcrRepository;
import org.elasticsearch.xpack.ccr.repository.CcrRestoreSourceService;
import org.elasticsearch.xpack.ccr.rest.RestCcrStatsAction;
import org.elasticsearch.xpack.ccr.rest.RestDeleteAutoFollowPatternAction;
import org.elasticsearch.xpack.ccr.rest.RestFollowInfoAction;
import org.elasticsearch.xpack.ccr.rest.RestFollowStatsAction;
import org.elasticsearch.xpack.ccr.rest.RestGetAutoFollowPatternAction;
import org.elasticsearch.xpack.ccr.rest.RestPauseFollowAction;
@ -80,6 +82,7 @@ import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction;
import org.elasticsearch.xpack.core.ccr.action.DeleteAutoFollowPatternAction;
import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction;
import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction;
import org.elasticsearch.xpack.core.ccr.action.GetAutoFollowPatternAction;
import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction;
@ -202,6 +205,7 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E
// stats action
new ActionHandler<>(FollowStatsAction.INSTANCE, TransportFollowStatsAction.class),
new ActionHandler<>(CcrStatsAction.INSTANCE, TransportCcrStatsAction.class),
new ActionHandler<>(FollowInfoAction.INSTANCE, TransportFollowInfoAction.class),
// follow actions
new ActionHandler<>(PutFollowAction.INSTANCE, TransportPutFollowAction.class),
new ActionHandler<>(ResumeFollowAction.INSTANCE, TransportResumeFollowAction.class),
@ -225,6 +229,7 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E
// stats API
new RestFollowStatsAction(settings, restController),
new RestCcrStatsAction(settings, restController),
new RestFollowInfoAction(settings, restController),
// follow APIs
new RestPutFollowAction(settings, restController),
new RestResumeFollowAction(settings, restController),

View File

@ -0,0 +1,117 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr.action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.TransportMasterNodeReadAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ccr.Ccr;
import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction;
import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction.Response.FollowParameters;
import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction.Response.FollowerInfo;
import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction.Response.Status;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
public class TransportFollowInfoAction extends TransportMasterNodeReadAction<FollowInfoAction.Request, FollowInfoAction.Response> {
@Inject
public TransportFollowInfoAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
super(FollowInfoAction.NAME, transportService, clusterService, threadPool, actionFilters, FollowInfoAction.Request::new,
indexNameExpressionResolver);
}
@Override
protected String executor() {
return ThreadPool.Names.SAME;
}
@Override
protected FollowInfoAction.Response newResponse() {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
protected FollowInfoAction.Response read(StreamInput in) throws IOException {
return new FollowInfoAction.Response(in);
}
@Override
protected void masterOperation(FollowInfoAction.Request request,
ClusterState state,
ActionListener<FollowInfoAction.Response> listener) throws Exception {
List<String> concreteFollowerIndices = Arrays.asList(indexNameExpressionResolver.concreteIndexNames(state,
IndicesOptions.STRICT_EXPAND_OPEN_CLOSED, request.getFollowerIndices()));
List<FollowerInfo> followerInfos = new ArrayList<>();
PersistentTasksCustomMetaData persistentTasks = state.metaData().custom(PersistentTasksCustomMetaData.TYPE);
for (IndexMetaData indexMetaData : state.metaData()) {
Map<String, String> ccrCustomData = indexMetaData.getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY);
if (ccrCustomData != null) {
Optional<ShardFollowTask> result;
if (persistentTasks != null) {
result = persistentTasks.taskMap().values().stream()
.map(persistentTask -> (ShardFollowTask) persistentTask.getParams())
.filter(shardFollowTask -> concreteFollowerIndices.isEmpty() ||
concreteFollowerIndices.contains(shardFollowTask.getFollowShardId().getIndexName()))
.findAny();
} else {
result = Optional.empty();
}
String followerIndex = indexMetaData.getIndex().getName();
String remoteCluster = ccrCustomData.get(Ccr.CCR_CUSTOM_METADATA_REMOTE_CLUSTER_NAME_KEY);
String leaderIndex = ccrCustomData.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_NAME_KEY);
if (result.isPresent()) {
ShardFollowTask params = result.get();
FollowParameters followParameters = new FollowParameters(
params.getMaxReadRequestOperationCount(),
params.getMaxReadRequestSize(),
params.getMaxOutstandingReadRequests(),
params.getMaxWriteRequestOperationCount(),
params.getMaxWriteRequestSize(),
params.getMaxOutstandingWriteRequests(),
params.getMaxWriteBufferCount(),
params.getMaxWriteBufferSize(),
params.getMaxRetryDelay(),
params.getReadPollTimeout()
);
followerInfos.add(new FollowerInfo(followerIndex, remoteCluster, leaderIndex, Status.ACTIVE, followParameters));
} else {
followerInfos.add(new FollowerInfo(followerIndex, remoteCluster, leaderIndex, Status.PAUSED, null));
}
}
}
listener.onResponse(new FollowInfoAction.Response(followerInfos));
}
@Override
protected ClusterBlockException checkBlock(FollowInfoAction.Request request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
}
}

View File

@ -0,0 +1,39 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr.rest;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction;
import java.io.IOException;
public class RestFollowInfoAction extends BaseRestHandler {
public RestFollowInfoAction(final Settings settings, final RestController controller) {
super(settings);
controller.registerHandler(RestRequest.Method.GET, "/{index}/_ccr/info", this);
}
@Override
public String getName() {
return "ccr_follower_info";
}
@Override
protected RestChannelConsumer prepareRequest(final RestRequest restRequest, final NodeClient client) throws IOException {
final FollowInfoAction.Request request = new FollowInfoAction.Request();
request.setFollowerIndices(Strings.splitStringByCommaToArray(restRequest.param("index")));
return channel -> client.execute(FollowInfoAction.INSTANCE, request, new RestToXContentListener<>(channel));
}
}

View File

@ -16,14 +16,16 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.xpack.CcrIntegTestCase;
import org.elasticsearch.xpack.ccr.action.ShardFollowTask;
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
import org.elasticsearch.xpack.core.ccr.AutoFollowStats;
import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction;
import org.elasticsearch.xpack.core.ccr.action.DeleteAutoFollowPatternAction;
import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction;
import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction.Response.FollowParameters;
import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction.Response.FollowerInfo;
import org.elasticsearch.xpack.core.ccr.action.PutAutoFollowPatternAction;
import java.util.Arrays;
@ -223,42 +225,52 @@ public class AutoFollowIT extends CcrIntegTestCase {
createLeaderIndex("logs-201901", leaderIndexSettings);
assertBusy(() -> {
PersistentTasksCustomMetaData persistentTasksMetaData =
followerClient().admin().cluster().prepareState().get().getState().getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
assertThat(persistentTasksMetaData, notNullValue());
assertThat(persistentTasksMetaData.tasks().size(), equalTo(1));
ShardFollowTask shardFollowTask = (ShardFollowTask) persistentTasksMetaData.tasks().iterator().next().getParams();
assertThat(shardFollowTask.getLeaderShardId().getIndexName(), equalTo("logs-201901"));
assertThat(shardFollowTask.getFollowShardId().getIndexName(), equalTo("copy-logs-201901"));
FollowInfoAction.Request followInfoRequest = new FollowInfoAction.Request();
followInfoRequest.setFollowerIndices("copy-logs-201901");
FollowInfoAction.Response followInfoResponse;
try {
followInfoResponse = followerClient().execute(FollowInfoAction.INSTANCE, followInfoRequest).actionGet();
} catch (IndexNotFoundException e) {
throw new AssertionError(e);
}
assertThat(followInfoResponse.getFollowInfos().size(), equalTo(1));
FollowerInfo followerInfo = followInfoResponse.getFollowInfos().get(0);
assertThat(followerInfo.getFollowerIndex(), equalTo("copy-logs-201901"));
assertThat(followerInfo.getRemoteCluster(), equalTo("leader_cluster"));
assertThat(followerInfo.getLeaderIndex(), equalTo("logs-201901"));
FollowParameters followParameters = followerInfo.getParameters();
assertThat(followParameters, notNullValue());
if (request.getMaxWriteBufferCount() != null) {
assertThat(shardFollowTask.getMaxWriteBufferCount(), equalTo(request.getMaxWriteBufferCount()));
assertThat(followParameters.getMaxWriteBufferCount(), equalTo(request.getMaxWriteBufferCount()));
}
if (request.getMaxWriteBufferSize() != null) {
assertThat(shardFollowTask.getMaxWriteBufferSize(), equalTo(request.getMaxWriteBufferSize()));
assertThat(followParameters.getMaxWriteBufferSize(), equalTo(request.getMaxWriteBufferSize()));
}
if (request.getMaxConcurrentReadBatches() != null) {
assertThat(shardFollowTask.getMaxOutstandingReadRequests(), equalTo(request.getMaxConcurrentReadBatches()));
assertThat(followParameters.getMaxOutstandingReadRequests(), equalTo(request.getMaxConcurrentReadBatches()));
}
if (request.getMaxConcurrentWriteBatches() != null) {
assertThat(shardFollowTask.getMaxOutstandingWriteRequests(), equalTo(request.getMaxConcurrentWriteBatches()));
assertThat(followParameters.getMaxOutstandingWriteRequests(), equalTo(request.getMaxConcurrentWriteBatches()));
}
if (request.getMaxReadRequestOperationCount() != null) {
assertThat(shardFollowTask.getMaxReadRequestOperationCount(), equalTo(request.getMaxReadRequestOperationCount()));
assertThat(followParameters.getMaxReadRequestOperationCount(), equalTo(request.getMaxReadRequestOperationCount()));
}
if (request.getMaxReadRequestSize() != null) {
assertThat(shardFollowTask.getMaxReadRequestSize(), equalTo(request.getMaxReadRequestSize()));
assertThat(followParameters.getMaxReadRequestSize(), equalTo(request.getMaxReadRequestSize()));
}
if (request.getMaxRetryDelay() != null) {
assertThat(shardFollowTask.getMaxRetryDelay(), equalTo(request.getMaxRetryDelay()));
assertThat(followParameters.getMaxRetryDelay(), equalTo(request.getMaxRetryDelay()));
}
if (request.getReadPollTimeout() != null) {
assertThat(shardFollowTask.getReadPollTimeout(), equalTo(request.getReadPollTimeout()));
assertThat(followParameters.getReadPollTimeout(), equalTo(request.getReadPollTimeout()));
}
if (request.getMaxWriteRequestOperationCount() != null) {
assertThat(shardFollowTask.getMaxWriteRequestOperationCount(), equalTo(request.getMaxWriteRequestOperationCount()));
assertThat(followParameters.getMaxWriteRequestOperationCount(), equalTo(request.getMaxWriteRequestOperationCount()));
}
if (request.getMaxWriteRequestSize() != null) {
assertThat(shardFollowTask.getMaxWriteRequestSize(), equalTo(request.getMaxWriteRequestSize()));
assertThat(followParameters.getMaxWriteRequestSize(), equalTo(request.getMaxWriteRequestSize()));
}
});
}

View File

@ -0,0 +1,25 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr.action;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction;
public class FollowInfoRequestTests extends AbstractWireSerializingTestCase<FollowInfoAction.Request> {
@Override
protected Writeable.Reader<FollowInfoAction.Request> instanceReader() {
return FollowInfoAction.Request::new;
}
@Override
protected FollowInfoAction.Request createTestInstance() {
FollowInfoAction.Request request = new FollowInfoAction.Request();
request.setFollowerIndices(generateRandomStringArray(4, 4, true, false));
return request;
}
}

View File

@ -0,0 +1,147 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr.action;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractSerializingTestCase;
import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction;
import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction.Response.FollowerInfo;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import static org.elasticsearch.xpack.core.ccr.action.FollowInfoAction.Response.FOLLOWER_INDICES_FIELD;
import static org.elasticsearch.xpack.core.ccr.action.FollowInfoAction.Response.FollowParameters;
import static org.elasticsearch.xpack.core.ccr.action.FollowInfoAction.Response.Status;
public class FollowInfoResponseTests extends AbstractSerializingTestCase<FollowInfoAction.Response> {
static final ConstructingObjectParser<FollowParameters, Void> PARAMETERS_PARSER = new ConstructingObjectParser<>(
"parameters_parser",
args -> {
return new FollowParameters(
(Integer) args[0],
(ByteSizeValue) args[1],
(Integer) args[2],
(Integer) args[3],
(ByteSizeValue) args[4],
(Integer) args[5],
(Integer) args[6],
(ByteSizeValue) args[7],
(TimeValue) args[8],
(TimeValue) args[9]
);
});
static {
PARAMETERS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), ShardFollowTask.MAX_READ_REQUEST_OPERATION_COUNT);
PARAMETERS_PARSER.declareField(
ConstructingObjectParser.constructorArg(),
(p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), ShardFollowTask.MAX_READ_REQUEST_SIZE.getPreferredName()),
ShardFollowTask.MAX_READ_REQUEST_SIZE,
ObjectParser.ValueType.STRING);
PARAMETERS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), ShardFollowTask.MAX_OUTSTANDING_READ_REQUESTS);
PARAMETERS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), ShardFollowTask.MAX_WRITE_REQUEST_OPERATION_COUNT);
PARAMETERS_PARSER.declareField(
ConstructingObjectParser.constructorArg(),
(p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), ShardFollowTask.MAX_WRITE_REQUEST_SIZE.getPreferredName()),
ShardFollowTask.MAX_WRITE_REQUEST_SIZE,
ObjectParser.ValueType.STRING);
PARAMETERS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), ShardFollowTask.MAX_OUTSTANDING_WRITE_REQUESTS);
PARAMETERS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), ShardFollowTask.MAX_WRITE_BUFFER_COUNT);
PARAMETERS_PARSER.declareField(
ConstructingObjectParser.constructorArg(),
(p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), ShardFollowTask.MAX_WRITE_BUFFER_SIZE.getPreferredName()),
ShardFollowTask.MAX_WRITE_BUFFER_SIZE,
ObjectParser.ValueType.STRING);
PARAMETERS_PARSER.declareField(
ConstructingObjectParser.constructorArg(),
(p, c) -> TimeValue.parseTimeValue(p.text(), ShardFollowTask.MAX_RETRY_DELAY.getPreferredName()),
ShardFollowTask.MAX_RETRY_DELAY,
ObjectParser.ValueType.STRING);
PARAMETERS_PARSER.declareField(
ConstructingObjectParser.constructorArg(),
(p, c) -> TimeValue.parseTimeValue(p.text(), ShardFollowTask.READ_POLL_TIMEOUT.getPreferredName()),
ShardFollowTask.READ_POLL_TIMEOUT,
ObjectParser.ValueType.STRING);
}
static final ConstructingObjectParser<FollowerInfo, Void> INFO_PARSER = new ConstructingObjectParser<>(
"info_parser",
args -> {
return new FollowerInfo(
(String) args[0],
(String) args[1],
(String) args[2],
Status.fromString((String) args[3]),
(FollowParameters) args[4]
);
});
static {
INFO_PARSER.declareString(ConstructingObjectParser.constructorArg(), FollowerInfo.FOLLOWER_INDEX_FIELD);
INFO_PARSER.declareString(ConstructingObjectParser.constructorArg(), FollowerInfo.REMOTE_CLUSTER_FIELD);
INFO_PARSER.declareString(ConstructingObjectParser.constructorArg(), FollowerInfo.LEADER_INDEX_FIELD);
INFO_PARSER.declareString(ConstructingObjectParser.constructorArg(), FollowerInfo.STATUS_FIELD);
INFO_PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), PARAMETERS_PARSER, FollowerInfo.PARAMETERS_FIELD);
}
@SuppressWarnings("unchecked")
static final ConstructingObjectParser<FollowInfoAction.Response, Void> PARSER = new ConstructingObjectParser<>(
"response",
args -> {
return new FollowInfoAction.Response(
(List<FollowerInfo>) args[0]
);
});
static {
PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), INFO_PARSER, FOLLOWER_INDICES_FIELD);
}
@Override
protected FollowInfoAction.Response doParseInstance(XContentParser parser) throws IOException {
return PARSER.apply(parser, null);
}
@Override
protected Writeable.Reader<FollowInfoAction.Response> instanceReader() {
return FollowInfoAction.Response::new;
}
@Override
protected FollowInfoAction.Response createTestInstance() {
int numInfos = randomIntBetween(0, 32);
List<FollowerInfo> infos = new ArrayList<>(numInfos);
for (int i = 0; i < numInfos; i++) {
FollowParameters followParameters = null;
if (randomBoolean()) {
followParameters = new FollowParameters(
randomIntBetween(0, Integer.MAX_VALUE),
new ByteSizeValue(randomNonNegativeLong()),
randomIntBetween(0, Integer.MAX_VALUE),
randomIntBetween(0, Integer.MAX_VALUE),
new ByteSizeValue(randomNonNegativeLong()),
randomIntBetween(0, Integer.MAX_VALUE),
randomIntBetween(0, Integer.MAX_VALUE),
new ByteSizeValue(randomNonNegativeLong()),
new TimeValue(randomNonNegativeLong()),
new TimeValue(randomNonNegativeLong())
);
}
infos.add(new FollowerInfo(randomAlphaOfLength(4), randomAlphaOfLength(4), randomAlphaOfLength(4),
randomFrom(Status.values()), followParameters));
}
return new FollowInfoAction.Response(infos);
}
}

View File

@ -0,0 +1,422 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.ccr.action;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.master.MasterNodeReadRequest;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_OUTSTANDING_READ_REQUESTS;
import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_OUTSTANDING_WRITE_REQUESTS;
import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_READ_REQUEST_OPERATION_COUNT;
import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_READ_REQUEST_SIZE;
import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_RETRY_DELAY_FIELD;
import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_WRITE_BUFFER_COUNT;
import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_WRITE_BUFFER_SIZE;
import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_WRITE_REQUEST_OPERATION_COUNT;
import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_WRITE_REQUEST_SIZE;
import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.READ_POLL_TIMEOUT;
public class FollowInfoAction extends Action<FollowInfoAction.Response> {
public static final String NAME = "cluster:monitor/ccr/follow_info";
public static final FollowInfoAction INSTANCE = new FollowInfoAction();
private FollowInfoAction() {
super(NAME);
}
@Override
public Response newResponse() {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
public Writeable.Reader<Response> getResponseReader() {
return Response::new;
}
public static class Request extends MasterNodeReadRequest<Request> {
private String[] followerIndices;
public Request() {
}
public String[] getFollowerIndices() {
return followerIndices;
}
public void setFollowerIndices(String... followerIndices) {
this.followerIndices = followerIndices;
}
@Override
public ActionRequestValidationException validate() {
return null;
}
public Request(StreamInput in) throws IOException {
super(in);
followerIndices = in.readOptionalStringArray();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeOptionalStringArray(followerIndices);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Request request = (Request) o;
return Arrays.equals(followerIndices, request.followerIndices);
}
@Override
public int hashCode() {
return Arrays.hashCode(followerIndices);
}
}
public static class Response extends ActionResponse implements ToXContentObject {
public static final ParseField FOLLOWER_INDICES_FIELD = new ParseField("follower_indices");
private final List<FollowerInfo> followInfos;
public Response(List<FollowerInfo> followInfos) {
this.followInfos = followInfos;
}
public List<FollowerInfo> getFollowInfos() {
return followInfos;
}
public Response(StreamInput in) throws IOException {
super(in);
followInfos = in.readList(FollowerInfo::new);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeList(followInfos);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.startArray(FOLLOWER_INDICES_FIELD.getPreferredName());
for (FollowerInfo followInfo : followInfos) {
followInfo.toXContent(builder, params);
}
builder.endArray();
builder.endObject();
return builder;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Response response = (Response) o;
return Objects.equals(followInfos, response.followInfos);
}
@Override
public int hashCode() {
return Objects.hash(followInfos);
}
public String toString() {
return Strings.toString(this);
}
public static class FollowerInfo implements Writeable, ToXContentObject {
public static final ParseField FOLLOWER_INDEX_FIELD = new ParseField("follower_index");
public static final ParseField REMOTE_CLUSTER_FIELD = new ParseField("remote_cluster");
public static final ParseField LEADER_INDEX_FIELD = new ParseField("leader_index");
public static final ParseField STATUS_FIELD = new ParseField("status");
public static final ParseField PARAMETERS_FIELD = new ParseField("parameters");
private final String followerIndex;
private final String remoteCluster;
private final String leaderIndex;
private final Status status;
private final FollowParameters parameters;
public FollowerInfo(String followerIndex, String remoteCluster, String leaderIndex, Status status,
FollowParameters parameters) {
this.followerIndex = followerIndex;
this.remoteCluster = remoteCluster;
this.leaderIndex = leaderIndex;
this.status = status;
this.parameters = parameters;
}
public String getFollowerIndex() {
return followerIndex;
}
public String getRemoteCluster() {
return remoteCluster;
}
public String getLeaderIndex() {
return leaderIndex;
}
public Status getStatus() {
return status;
}
public FollowParameters getParameters() {
return parameters;
}
FollowerInfo(StreamInput in) throws IOException {
followerIndex = in.readString();
remoteCluster = in.readString();
leaderIndex = in.readString();
status = Status.fromString(in.readString());
parameters = in.readOptionalWriteable(FollowParameters::new);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(followerIndex);
out.writeString(remoteCluster);
out.writeString(leaderIndex);
out.writeString(status.name);
out.writeOptionalWriteable(parameters);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(FOLLOWER_INDEX_FIELD.getPreferredName(), followerIndex);
builder.field(REMOTE_CLUSTER_FIELD.getPreferredName(), remoteCluster);
builder.field(LEADER_INDEX_FIELD.getPreferredName(), leaderIndex);
builder.field(STATUS_FIELD.getPreferredName(), status.name);
if (parameters != null) {
builder.startObject(PARAMETERS_FIELD.getPreferredName());
{
builder.field(MAX_READ_REQUEST_OPERATION_COUNT.getPreferredName(), parameters.maxReadRequestOperationCount);
builder.field(MAX_READ_REQUEST_SIZE.getPreferredName(), parameters.maxReadRequestSize.getStringRep());
builder.field(MAX_OUTSTANDING_READ_REQUESTS.getPreferredName(), parameters.maxOutstandingReadRequests);
builder.field(MAX_WRITE_REQUEST_OPERATION_COUNT.getPreferredName(), parameters.maxWriteRequestOperationCount);
builder.field(MAX_WRITE_REQUEST_SIZE.getPreferredName(), parameters.maxWriteRequestSize.getStringRep());
builder.field(MAX_OUTSTANDING_WRITE_REQUESTS.getPreferredName(), parameters.maxOutstandingWriteRequests);
builder.field(MAX_WRITE_BUFFER_COUNT.getPreferredName(), parameters.maxWriteBufferCount);
builder.field(MAX_WRITE_BUFFER_SIZE.getPreferredName(), parameters.maxWriteBufferSize.getStringRep());
builder.field(MAX_RETRY_DELAY_FIELD.getPreferredName(), parameters.maxRetryDelay.getStringRep());
builder.field(READ_POLL_TIMEOUT.getPreferredName(), parameters.readPollTimeout.getStringRep());
}
builder.endObject();
}
builder.endObject();
return builder;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
FollowerInfo that = (FollowerInfo) o;
return Objects.equals(followerIndex, that.followerIndex) &&
Objects.equals(remoteCluster, that.remoteCluster) &&
Objects.equals(leaderIndex, that.leaderIndex) &&
status == that.status &&
Objects.equals(parameters, that.parameters);
}
@Override
public int hashCode() {
return Objects.hash(followerIndex, remoteCluster, leaderIndex, status, parameters);
}
public String toString() {
return Strings.toString(this);
}
}
public static class FollowParameters implements Writeable {
private final int maxReadRequestOperationCount;
private final ByteSizeValue maxReadRequestSize;
private final int maxOutstandingReadRequests;
private final int maxWriteRequestOperationCount;
private final ByteSizeValue maxWriteRequestSize;
private final int maxOutstandingWriteRequests;
private final int maxWriteBufferCount;
private final ByteSizeValue maxWriteBufferSize;
private final TimeValue maxRetryDelay;
private final TimeValue readPollTimeout;
public FollowParameters(int maxReadRequestOperationCount,
ByteSizeValue maxReadRequestSize, int maxOutstandingReadRequests,
int maxWriteRequestOperationCount, ByteSizeValue maxWriteRequestSize,
int maxOutstandingWriteRequests, int maxWriteBufferCount,
ByteSizeValue maxWriteBufferSize, TimeValue maxRetryDelay, TimeValue readPollTimeout) {
this.maxReadRequestOperationCount = maxReadRequestOperationCount;
this.maxReadRequestSize = maxReadRequestSize;
this.maxOutstandingReadRequests = maxOutstandingReadRequests;
this.maxWriteRequestOperationCount = maxWriteRequestOperationCount;
this.maxWriteRequestSize = maxWriteRequestSize;
this.maxOutstandingWriteRequests = maxOutstandingWriteRequests;
this.maxWriteBufferCount = maxWriteBufferCount;
this.maxWriteBufferSize = maxWriteBufferSize;
this.maxRetryDelay = maxRetryDelay;
this.readPollTimeout = readPollTimeout;
}
public int getMaxReadRequestOperationCount() {
return maxReadRequestOperationCount;
}
public ByteSizeValue getMaxReadRequestSize() {
return maxReadRequestSize;
}
public int getMaxOutstandingReadRequests() {
return maxOutstandingReadRequests;
}
public int getMaxWriteRequestOperationCount() {
return maxWriteRequestOperationCount;
}
public ByteSizeValue getMaxWriteRequestSize() {
return maxWriteRequestSize;
}
public int getMaxOutstandingWriteRequests() {
return maxOutstandingWriteRequests;
}
public int getMaxWriteBufferCount() {
return maxWriteBufferCount;
}
public ByteSizeValue getMaxWriteBufferSize() {
return maxWriteBufferSize;
}
public TimeValue getMaxRetryDelay() {
return maxRetryDelay;
}
public TimeValue getReadPollTimeout() {
return readPollTimeout;
}
FollowParameters(StreamInput in) throws IOException {
this.maxReadRequestOperationCount = in.readVInt();
this.maxReadRequestSize = new ByteSizeValue(in);
this.maxOutstandingReadRequests = in.readVInt();
this.maxWriteRequestOperationCount = in.readVInt();
this.maxWriteRequestSize = new ByteSizeValue(in);
this.maxOutstandingWriteRequests = in.readVInt();
this.maxWriteBufferCount = in.readVInt();
this.maxWriteBufferSize = new ByteSizeValue(in);
this.maxRetryDelay = in.readTimeValue();
this.readPollTimeout = in.readTimeValue();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(maxReadRequestOperationCount);
maxReadRequestSize.writeTo(out);
out.writeVInt(maxOutstandingReadRequests);
out.writeVLong(maxWriteRequestOperationCount);
maxWriteRequestSize.writeTo(out);
out.writeVInt(maxOutstandingWriteRequests);
out.writeVInt(maxWriteBufferCount);
maxWriteBufferSize.writeTo(out);
out.writeTimeValue(maxRetryDelay);
out.writeTimeValue(readPollTimeout);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
FollowParameters that = (FollowParameters) o;
return maxReadRequestOperationCount == that.maxReadRequestOperationCount &&
maxOutstandingReadRequests == that.maxOutstandingReadRequests &&
maxWriteRequestOperationCount == that.maxWriteRequestOperationCount &&
maxOutstandingWriteRequests == that.maxOutstandingWriteRequests &&
maxWriteBufferCount == that.maxWriteBufferCount &&
Objects.equals(maxReadRequestSize, that.maxReadRequestSize) &&
Objects.equals(maxWriteRequestSize, that.maxWriteRequestSize) &&
Objects.equals(maxWriteBufferSize, that.maxWriteBufferSize) &&
Objects.equals(maxRetryDelay, that.maxRetryDelay) &&
Objects.equals(readPollTimeout, that.readPollTimeout);
}
@Override
public int hashCode() {
return Objects.hash(
maxReadRequestOperationCount,
maxReadRequestSize,
maxOutstandingReadRequests,
maxWriteRequestOperationCount,
maxWriteRequestSize,
maxOutstandingWriteRequests,
maxWriteBufferCount,
maxWriteBufferSize,
maxRetryDelay,
readPollTimeout
);
}
}
public enum Status {
ACTIVE("active"),
PAUSED("paused");
private final String name;
Status(String name) {
this.name = name;
}
public static Status fromString(String value) {
switch (value) {
case "active":
return Status.ACTIVE;
case "paused":
return Status.PAUSED;
default:
throw new IllegalArgumentException("unexpected status value [" + value + "]");
}
}
}
}
}

View File

@ -0,0 +1,16 @@
{
"ccr.follow_info": {
"documentation": "https://www.elastic.co/guide/en/elasticsearch/reference/current/ccr-get-follow-info.html",
"methods": [ "GET" ],
"url": {
"path": "/{index}/_ccr/info",
"paths": [ "/{index}/_ccr/info" ],
"parts": {
"index": {
"type": "list",
"description": "A comma-separated list of index patterns; use `_all` to perform the operation on all indices"
}
}
}
}
}