[CCR] Renamed leader_cluster to remote_cluster (#34776)
and also some occurrences of clusterAlias to remoteCluster. Closes #34682
This commit is contained in:
parent
be907516ad
commit
76240e6bbe
|
@ -25,7 +25,7 @@ public class CcrMultiClusterLicenseIT extends ESCCRRestTestCase {
|
||||||
public void testFollow() {
|
public void testFollow() {
|
||||||
if ("follow".equals(targetCluster)) {
|
if ("follow".equals(targetCluster)) {
|
||||||
final Request request = new Request("PUT", "/follower/_ccr/follow");
|
final Request request = new Request("PUT", "/follower/_ccr/follow");
|
||||||
request.setJsonEntity("{\"leader_cluster\": \"leader_cluster\", \"leader_index\": \"leader\"}");
|
request.setJsonEntity("{\"remote_cluster\": \"leader_cluster\", \"leader_index\": \"leader\"}");
|
||||||
assertNonCompliantLicense(request);
|
assertNonCompliantLicense(request);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -34,7 +34,7 @@ public class CcrMultiClusterLicenseIT extends ESCCRRestTestCase {
|
||||||
assumeFalse("windows is the worst", Constants.WINDOWS);
|
assumeFalse("windows is the worst", Constants.WINDOWS);
|
||||||
if ("follow".equals(targetCluster)) {
|
if ("follow".equals(targetCluster)) {
|
||||||
final Request request = new Request("PUT", "/_ccr/auto_follow/test_pattern");
|
final Request request = new Request("PUT", "/_ccr/auto_follow/test_pattern");
|
||||||
request.setJsonEntity("{\"leader_index_patterns\":[\"*\"], \"leader_cluster\": \"leader_cluster\"}");
|
request.setJsonEntity("{\"leader_index_patterns\":[\"*\"], \"remote_cluster\": \"leader_cluster\"}");
|
||||||
client().performRequest(request);
|
client().performRequest(request);
|
||||||
|
|
||||||
// parse the logs and ensure that the auto-coordinator skipped coordination on the leader cluster
|
// parse the logs and ensure that the auto-coordinator skipped coordination on the leader cluster
|
||||||
|
|
|
@ -132,13 +132,13 @@ public class FollowIndexSecurityIT extends ESCCRRestTestCase {
|
||||||
|
|
||||||
{
|
{
|
||||||
Request request = new Request("PUT", "/_ccr/auto_follow/test_pattern");
|
Request request = new Request("PUT", "/_ccr/auto_follow/test_pattern");
|
||||||
request.setJsonEntity("{\"leader_index_patterns\": [\"logs-*\"], \"leader_cluster\": \"leader_cluster\"}");
|
request.setJsonEntity("{\"leader_index_patterns\": [\"logs-*\"], \"remote_cluster\": \"leader_cluster\"}");
|
||||||
Exception e = expectThrows(ResponseException.class, () -> assertOK(client().performRequest(request)));
|
Exception e = expectThrows(ResponseException.class, () -> assertOK(client().performRequest(request)));
|
||||||
assertThat(e.getMessage(), containsString("insufficient privileges to follow index [logs-*]"));
|
assertThat(e.getMessage(), containsString("insufficient privileges to follow index [logs-*]"));
|
||||||
}
|
}
|
||||||
|
|
||||||
Request request = new Request("PUT", "/_ccr/auto_follow/test_pattern");
|
Request request = new Request("PUT", "/_ccr/auto_follow/test_pattern");
|
||||||
request.setJsonEntity("{\"leader_index_patterns\": [\"logs-eu-*\"], \"leader_cluster\": \"leader_cluster\"}");
|
request.setJsonEntity("{\"leader_index_patterns\": [\"logs-eu-*\"], \"remote_cluster\": \"leader_cluster\"}");
|
||||||
assertOK(client().performRequest(request));
|
assertOK(client().performRequest(request));
|
||||||
|
|
||||||
try (RestClient leaderClient = buildLeaderClient()) {
|
try (RestClient leaderClient = buildLeaderClient()) {
|
||||||
|
|
|
@ -83,7 +83,7 @@ public class FollowIndexIT extends ESCCRRestTestCase {
|
||||||
assumeFalse("Test should only run when both clusters are running", "leader".equals(targetCluster));
|
assumeFalse("Test should only run when both clusters are running", "leader".equals(targetCluster));
|
||||||
|
|
||||||
Request request = new Request("PUT", "/_ccr/auto_follow/test_pattern");
|
Request request = new Request("PUT", "/_ccr/auto_follow/test_pattern");
|
||||||
request.setJsonEntity("{\"leader_index_patterns\": [\"logs-*\"], \"leader_cluster\": \"leader_cluster\"}");
|
request.setJsonEntity("{\"leader_index_patterns\": [\"logs-*\"], \"remote_cluster\": \"leader_cluster\"}");
|
||||||
assertOK(client().performRequest(request));
|
assertOK(client().performRequest(request));
|
||||||
|
|
||||||
try (RestClient leaderClient = buildLeaderClient()) {
|
try (RestClient leaderClient = buildLeaderClient()) {
|
||||||
|
|
|
@ -23,7 +23,7 @@
|
||||||
ccr.put_auto_follow_pattern:
|
ccr.put_auto_follow_pattern:
|
||||||
name: my_pattern
|
name: my_pattern
|
||||||
body:
|
body:
|
||||||
leader_cluster: local
|
remote_cluster: local
|
||||||
leader_index_patterns: ['logs-*']
|
leader_index_patterns: ['logs-*']
|
||||||
max_concurrent_read_batches: 2
|
max_concurrent_read_batches: 2
|
||||||
- is_true: acknowledged
|
- is_true: acknowledged
|
||||||
|
@ -31,13 +31,13 @@
|
||||||
- do:
|
- do:
|
||||||
ccr.get_auto_follow_pattern:
|
ccr.get_auto_follow_pattern:
|
||||||
name: my_pattern
|
name: my_pattern
|
||||||
- match: { my_pattern.leader_cluster: 'local' }
|
- match: { my_pattern.remote_cluster: 'local' }
|
||||||
- match: { my_pattern.leader_index_patterns: ['logs-*'] }
|
- match: { my_pattern.leader_index_patterns: ['logs-*'] }
|
||||||
- match: { my_pattern.max_concurrent_read_batches: 2 }
|
- match: { my_pattern.max_concurrent_read_batches: 2 }
|
||||||
|
|
||||||
- do:
|
- do:
|
||||||
ccr.get_auto_follow_pattern: {}
|
ccr.get_auto_follow_pattern: {}
|
||||||
- match: { my_pattern.leader_cluster: 'local' }
|
- match: { my_pattern.remote_cluster: 'local' }
|
||||||
- match: { my_pattern.leader_index_patterns: ['logs-*'] }
|
- match: { my_pattern.leader_index_patterns: ['logs-*'] }
|
||||||
- match: { my_pattern.max_concurrent_read_batches: 2 }
|
- match: { my_pattern.max_concurrent_read_batches: 2 }
|
||||||
|
|
||||||
|
|
|
@ -38,7 +38,7 @@
|
||||||
ccr.follow:
|
ccr.follow:
|
||||||
index: bar
|
index: bar
|
||||||
body:
|
body:
|
||||||
leader_cluster: local
|
remote_cluster: local
|
||||||
leader_index: foo
|
leader_index: foo
|
||||||
- is_true: follow_index_created
|
- is_true: follow_index_created
|
||||||
- is_true: follow_index_shards_acked
|
- is_true: follow_index_shards_acked
|
||||||
|
|
|
@ -37,7 +37,7 @@
|
||||||
ccr.follow:
|
ccr.follow:
|
||||||
index: bar
|
index: bar
|
||||||
body:
|
body:
|
||||||
leader_cluster: local
|
remote_cluster: local
|
||||||
leader_index: foo
|
leader_index: foo
|
||||||
- is_true: follow_index_created
|
- is_true: follow_index_created
|
||||||
- is_true: follow_index_shards_acked
|
- is_true: follow_index_shards_acked
|
||||||
|
|
|
@ -73,7 +73,7 @@ public class ESCCRRestTestCase extends ESRestTestCase {
|
||||||
|
|
||||||
protected static void followIndex(RestClient client, String leaderCluster, String leaderIndex, String followIndex) throws IOException {
|
protected static void followIndex(RestClient client, String leaderCluster, String leaderIndex, String followIndex) throws IOException {
|
||||||
final Request request = new Request("PUT", "/" + followIndex + "/_ccr/follow");
|
final Request request = new Request("PUT", "/" + followIndex + "/_ccr/follow");
|
||||||
request.setJsonEntity("{\"leader_cluster\": \"" + leaderCluster + "\", \"leader_index\": \"" + leaderIndex +
|
request.setJsonEntity("{\"remote_cluster\": \"" + leaderCluster + "\", \"leader_index\": \"" + leaderIndex +
|
||||||
"\", \"poll_timeout\": \"10ms\"}");
|
"\", \"poll_timeout\": \"10ms\"}");
|
||||||
assertOK(client.performRequest(request));
|
assertOK(client.performRequest(request));
|
||||||
}
|
}
|
||||||
|
|
|
@ -98,7 +98,7 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E
|
||||||
public static final String CCR_CUSTOM_METADATA_LEADER_INDEX_SHARD_HISTORY_UUIDS = "leader_index_shard_history_uuids";
|
public static final String CCR_CUSTOM_METADATA_LEADER_INDEX_SHARD_HISTORY_UUIDS = "leader_index_shard_history_uuids";
|
||||||
public static final String CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY = "leader_index_uuid";
|
public static final String CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY = "leader_index_uuid";
|
||||||
public static final String CCR_CUSTOM_METADATA_LEADER_INDEX_NAME_KEY = "leader_index_name";
|
public static final String CCR_CUSTOM_METADATA_LEADER_INDEX_NAME_KEY = "leader_index_name";
|
||||||
public static final String CCR_CUSTOM_METADATA_LEADER_CLUSTER_NAME_KEY = "leader_cluster_name";
|
public static final String CCR_CUSTOM_METADATA_REMOTE_CLUSTER_NAME_KEY = "remote_cluster_name";
|
||||||
|
|
||||||
private final boolean enabled;
|
private final boolean enabled;
|
||||||
private final Settings settings;
|
private final Settings settings;
|
||||||
|
|
|
@ -159,7 +159,7 @@ public class AutoFollowCoordinator implements ClusterStateApplier {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
void getLeaderClusterState(final Map<String, String> headers,
|
void getLeaderClusterState(final Map<String, String> headers,
|
||||||
final String leaderClusterAlias,
|
final String remoteCluster,
|
||||||
final BiConsumer<ClusterState, Exception> handler) {
|
final BiConsumer<ClusterState, Exception> handler) {
|
||||||
final ClusterStateRequest request = new ClusterStateRequest();
|
final ClusterStateRequest request = new ClusterStateRequest();
|
||||||
request.clear();
|
request.clear();
|
||||||
|
@ -168,7 +168,7 @@ public class AutoFollowCoordinator implements ClusterStateApplier {
|
||||||
ccrLicenseChecker.checkRemoteClusterLicenseAndFetchClusterState(
|
ccrLicenseChecker.checkRemoteClusterLicenseAndFetchClusterState(
|
||||||
client,
|
client,
|
||||||
headers,
|
headers,
|
||||||
leaderClusterAlias,
|
remoteCluster,
|
||||||
request,
|
request,
|
||||||
e -> handler.accept(null, e),
|
e -> handler.accept(null, e),
|
||||||
leaderClusterState -> handler.accept(leaderClusterState, null));
|
leaderClusterState -> handler.accept(leaderClusterState, null));
|
||||||
|
@ -245,14 +245,14 @@ public class AutoFollowCoordinator implements ClusterStateApplier {
|
||||||
final int slot = i;
|
final int slot = i;
|
||||||
final String autoFollowPattenName = entry.getKey();
|
final String autoFollowPattenName = entry.getKey();
|
||||||
final AutoFollowPattern autoFollowPattern = entry.getValue();
|
final AutoFollowPattern autoFollowPattern = entry.getValue();
|
||||||
final String leaderCluster = autoFollowPattern.getLeaderCluster();
|
final String remoteCluster = autoFollowPattern.getRemoteCluster();
|
||||||
|
|
||||||
Map<String, String> headers = autoFollowMetadata.getHeaders().get(autoFollowPattenName);
|
Map<String, String> headers = autoFollowMetadata.getHeaders().get(autoFollowPattenName);
|
||||||
getLeaderClusterState(headers, leaderCluster, (leaderClusterState, e) -> {
|
getLeaderClusterState(headers, remoteCluster, (leaderClusterState, e) -> {
|
||||||
if (leaderClusterState != null) {
|
if (leaderClusterState != null) {
|
||||||
assert e == null;
|
assert e == null;
|
||||||
final List<String> followedIndices = autoFollowMetadata.getFollowedLeaderIndexUUIDs().get(autoFollowPattenName);
|
final List<String> followedIndices = autoFollowMetadata.getFollowedLeaderIndexUUIDs().get(autoFollowPattenName);
|
||||||
final List<Index> leaderIndicesToFollow = getLeaderIndicesToFollow(leaderCluster, autoFollowPattern,
|
final List<Index> leaderIndicesToFollow = getLeaderIndicesToFollow(remoteCluster, autoFollowPattern,
|
||||||
leaderClusterState, followerClusterState, followedIndices);
|
leaderClusterState, followerClusterState, followedIndices);
|
||||||
if (leaderIndicesToFollow.isEmpty()) {
|
if (leaderIndicesToFollow.isEmpty()) {
|
||||||
finalise(slot, new AutoFollowResult(autoFollowPattenName));
|
finalise(slot, new AutoFollowResult(autoFollowPattenName));
|
||||||
|
@ -260,12 +260,12 @@ public class AutoFollowCoordinator implements ClusterStateApplier {
|
||||||
List<Tuple<String, AutoFollowPattern>> patternsForTheSameLeaderCluster = autoFollowMetadata.getPatterns()
|
List<Tuple<String, AutoFollowPattern>> patternsForTheSameLeaderCluster = autoFollowMetadata.getPatterns()
|
||||||
.entrySet().stream()
|
.entrySet().stream()
|
||||||
.filter(item -> autoFollowPattenName.equals(item.getKey()) == false)
|
.filter(item -> autoFollowPattenName.equals(item.getKey()) == false)
|
||||||
.filter(item -> leaderCluster.equals(item.getValue().getLeaderCluster()))
|
.filter(item -> remoteCluster.equals(item.getValue().getRemoteCluster()))
|
||||||
.map(item -> new Tuple<>(item.getKey(), item.getValue()))
|
.map(item -> new Tuple<>(item.getKey(), item.getValue()))
|
||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
|
|
||||||
Consumer<AutoFollowResult> resultHandler = result -> finalise(slot, result);
|
Consumer<AutoFollowResult> resultHandler = result -> finalise(slot, result);
|
||||||
checkAutoFollowPattern(autoFollowPattenName, leaderCluster, autoFollowPattern, leaderIndicesToFollow, headers,
|
checkAutoFollowPattern(autoFollowPattenName, remoteCluster, autoFollowPattern, leaderIndicesToFollow, headers,
|
||||||
patternsForTheSameLeaderCluster, resultHandler);
|
patternsForTheSameLeaderCluster, resultHandler);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -313,7 +313,7 @@ public class AutoFollowCoordinator implements ClusterStateApplier {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void followLeaderIndex(String autoFollowPattenName,
|
private void followLeaderIndex(String autoFollowPattenName,
|
||||||
String leaderCluster,
|
String remoteCluster,
|
||||||
Index indexToFollow,
|
Index indexToFollow,
|
||||||
AutoFollowPattern pattern,
|
AutoFollowPattern pattern,
|
||||||
Map<String,String> headers,
|
Map<String,String> headers,
|
||||||
|
@ -332,7 +332,7 @@ public class AutoFollowCoordinator implements ClusterStateApplier {
|
||||||
followRequest.setPollTimeout(pattern.getPollTimeout());
|
followRequest.setPollTimeout(pattern.getPollTimeout());
|
||||||
|
|
||||||
PutFollowAction.Request request = new PutFollowAction.Request();
|
PutFollowAction.Request request = new PutFollowAction.Request();
|
||||||
request.setLeaderCluster(leaderCluster);
|
request.setRemoteCluster(remoteCluster);
|
||||||
request.setLeaderIndex(indexToFollow.getName());
|
request.setLeaderIndex(indexToFollow.getName());
|
||||||
request.setFollowRequest(followRequest);
|
request.setFollowRequest(followRequest);
|
||||||
|
|
||||||
|
@ -357,7 +357,7 @@ public class AutoFollowCoordinator implements ClusterStateApplier {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static List<Index> getLeaderIndicesToFollow(String clusterAlias,
|
static List<Index> getLeaderIndicesToFollow(String remoteCluster,
|
||||||
AutoFollowPattern autoFollowPattern,
|
AutoFollowPattern autoFollowPattern,
|
||||||
ClusterState leaderClusterState,
|
ClusterState leaderClusterState,
|
||||||
ClusterState followerClusterState,
|
ClusterState followerClusterState,
|
||||||
|
@ -409,12 +409,12 @@ public class AutoFollowCoordinator implements ClusterStateApplier {
|
||||||
* Fetch the cluster state from the leader with the specified cluster alias
|
* Fetch the cluster state from the leader with the specified cluster alias
|
||||||
*
|
*
|
||||||
* @param headers the client headers
|
* @param headers the client headers
|
||||||
* @param leaderClusterAlias the cluster alias of the leader
|
* @param remoteCluster the name of the leader cluster
|
||||||
* @param handler the callback to invoke
|
* @param handler the callback to invoke
|
||||||
*/
|
*/
|
||||||
abstract void getLeaderClusterState(
|
abstract void getLeaderClusterState(
|
||||||
Map<String, String> headers,
|
Map<String, String> headers,
|
||||||
String leaderClusterAlias,
|
String remoteCluster,
|
||||||
BiConsumer<ClusterState, Exception> handler
|
BiConsumer<ClusterState, Exception> handler
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|
|
@ -443,7 +443,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
|
||||||
timeSinceLastFetchMillis = -1;
|
timeSinceLastFetchMillis = -1;
|
||||||
}
|
}
|
||||||
return new ShardFollowNodeTaskStatus(
|
return new ShardFollowNodeTaskStatus(
|
||||||
params.getLeaderCluster(),
|
params.getRemoteCluster(),
|
||||||
params.getLeaderShardId().getIndexName(),
|
params.getLeaderShardId().getIndexName(),
|
||||||
params.getFollowShardId().getIndexName(),
|
params.getFollowShardId().getIndexName(),
|
||||||
getFollowShardId().getId(),
|
getFollowShardId().getId(),
|
||||||
|
|
|
@ -36,7 +36,7 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
|
||||||
public static final Set<String> HEADER_FILTERS =
|
public static final Set<String> HEADER_FILTERS =
|
||||||
Collections.unmodifiableSet(new HashSet<>(Arrays.asList("es-security-runas-user", "_xpack_security_authentication")));
|
Collections.unmodifiableSet(new HashSet<>(Arrays.asList("es-security-runas-user", "_xpack_security_authentication")));
|
||||||
|
|
||||||
static final ParseField LEADER_CLUSTER_FIELD = new ParseField("leader_cluster");
|
static final ParseField REMOTE_CLUSTER_FIELD = new ParseField("remote_cluster");
|
||||||
static final ParseField FOLLOW_SHARD_INDEX_FIELD = new ParseField("follow_shard_index");
|
static final ParseField FOLLOW_SHARD_INDEX_FIELD = new ParseField("follow_shard_index");
|
||||||
static final ParseField FOLLOW_SHARD_INDEX_UUID_FIELD = new ParseField("follow_shard_index_uuid");
|
static final ParseField FOLLOW_SHARD_INDEX_UUID_FIELD = new ParseField("follow_shard_index_uuid");
|
||||||
static final ParseField FOLLOW_SHARD_SHARDID_FIELD = new ParseField("follow_shard_shard");
|
static final ParseField FOLLOW_SHARD_SHARDID_FIELD = new ParseField("follow_shard_shard");
|
||||||
|
@ -59,7 +59,7 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
|
||||||
(int) a[10], (int) a[11], (TimeValue) a[12], (TimeValue) a[13], (Map<String, String>) a[14]));
|
(int) a[10], (int) a[11], (TimeValue) a[12], (TimeValue) a[13], (Map<String, String>) a[14]));
|
||||||
|
|
||||||
static {
|
static {
|
||||||
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), LEADER_CLUSTER_FIELD);
|
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), REMOTE_CLUSTER_FIELD);
|
||||||
PARSER.declareString(ConstructingObjectParser.constructorArg(), FOLLOW_SHARD_INDEX_FIELD);
|
PARSER.declareString(ConstructingObjectParser.constructorArg(), FOLLOW_SHARD_INDEX_FIELD);
|
||||||
PARSER.declareString(ConstructingObjectParser.constructorArg(), FOLLOW_SHARD_INDEX_UUID_FIELD);
|
PARSER.declareString(ConstructingObjectParser.constructorArg(), FOLLOW_SHARD_INDEX_UUID_FIELD);
|
||||||
PARSER.declareInt(ConstructingObjectParser.constructorArg(), FOLLOW_SHARD_SHARDID_FIELD);
|
PARSER.declareInt(ConstructingObjectParser.constructorArg(), FOLLOW_SHARD_SHARDID_FIELD);
|
||||||
|
@ -84,7 +84,7 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
|
||||||
PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> p.mapStrings(), HEADERS);
|
PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> p.mapStrings(), HEADERS);
|
||||||
}
|
}
|
||||||
|
|
||||||
private final String leaderCluster;
|
private final String remoteCluster;
|
||||||
private final ShardId followShardId;
|
private final ShardId followShardId;
|
||||||
private final ShardId leaderShardId;
|
private final ShardId leaderShardId;
|
||||||
private final int maxBatchOperationCount;
|
private final int maxBatchOperationCount;
|
||||||
|
@ -97,7 +97,7 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
|
||||||
private final Map<String, String> headers;
|
private final Map<String, String> headers;
|
||||||
|
|
||||||
ShardFollowTask(
|
ShardFollowTask(
|
||||||
final String leaderCluster,
|
final String remoteCluster,
|
||||||
final ShardId followShardId,
|
final ShardId followShardId,
|
||||||
final ShardId leaderShardId,
|
final ShardId leaderShardId,
|
||||||
final int maxBatchOperationCount,
|
final int maxBatchOperationCount,
|
||||||
|
@ -108,7 +108,7 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
|
||||||
final TimeValue maxRetryDelay,
|
final TimeValue maxRetryDelay,
|
||||||
final TimeValue pollTimeout,
|
final TimeValue pollTimeout,
|
||||||
final Map<String, String> headers) {
|
final Map<String, String> headers) {
|
||||||
this.leaderCluster = leaderCluster;
|
this.remoteCluster = remoteCluster;
|
||||||
this.followShardId = followShardId;
|
this.followShardId = followShardId;
|
||||||
this.leaderShardId = leaderShardId;
|
this.leaderShardId = leaderShardId;
|
||||||
this.maxBatchOperationCount = maxBatchOperationCount;
|
this.maxBatchOperationCount = maxBatchOperationCount;
|
||||||
|
@ -122,7 +122,7 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
|
||||||
}
|
}
|
||||||
|
|
||||||
public ShardFollowTask(StreamInput in) throws IOException {
|
public ShardFollowTask(StreamInput in) throws IOException {
|
||||||
this.leaderCluster = in.readString();
|
this.remoteCluster = in.readString();
|
||||||
this.followShardId = ShardId.readShardId(in);
|
this.followShardId = ShardId.readShardId(in);
|
||||||
this.leaderShardId = ShardId.readShardId(in);
|
this.leaderShardId = ShardId.readShardId(in);
|
||||||
this.maxBatchOperationCount = in.readVInt();
|
this.maxBatchOperationCount = in.readVInt();
|
||||||
|
@ -135,8 +135,8 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
|
||||||
this.headers = Collections.unmodifiableMap(in.readMap(StreamInput::readString, StreamInput::readString));
|
this.headers = Collections.unmodifiableMap(in.readMap(StreamInput::readString, StreamInput::readString));
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getLeaderCluster() {
|
public String getRemoteCluster() {
|
||||||
return leaderCluster;
|
return remoteCluster;
|
||||||
}
|
}
|
||||||
|
|
||||||
public ShardId getFollowShardId() {
|
public ShardId getFollowShardId() {
|
||||||
|
@ -190,7 +190,7 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void writeTo(StreamOutput out) throws IOException {
|
public void writeTo(StreamOutput out) throws IOException {
|
||||||
out.writeString(leaderCluster);
|
out.writeString(remoteCluster);
|
||||||
followShardId.writeTo(out);
|
followShardId.writeTo(out);
|
||||||
leaderShardId.writeTo(out);
|
leaderShardId.writeTo(out);
|
||||||
out.writeVLong(maxBatchOperationCount);
|
out.writeVLong(maxBatchOperationCount);
|
||||||
|
@ -210,7 +210,7 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
|
||||||
@Override
|
@Override
|
||||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||||
builder.startObject();
|
builder.startObject();
|
||||||
builder.field(LEADER_CLUSTER_FIELD.getPreferredName(), leaderCluster);
|
builder.field(REMOTE_CLUSTER_FIELD.getPreferredName(), remoteCluster);
|
||||||
builder.field(FOLLOW_SHARD_INDEX_FIELD.getPreferredName(), followShardId.getIndex().getName());
|
builder.field(FOLLOW_SHARD_INDEX_FIELD.getPreferredName(), followShardId.getIndex().getName());
|
||||||
builder.field(FOLLOW_SHARD_INDEX_UUID_FIELD.getPreferredName(), followShardId.getIndex().getUUID());
|
builder.field(FOLLOW_SHARD_INDEX_UUID_FIELD.getPreferredName(), followShardId.getIndex().getUUID());
|
||||||
builder.field(FOLLOW_SHARD_SHARDID_FIELD.getPreferredName(), followShardId.id());
|
builder.field(FOLLOW_SHARD_SHARDID_FIELD.getPreferredName(), followShardId.id());
|
||||||
|
@ -233,7 +233,7 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
|
||||||
if (this == o) return true;
|
if (this == o) return true;
|
||||||
if (o == null || getClass() != o.getClass()) return false;
|
if (o == null || getClass() != o.getClass()) return false;
|
||||||
ShardFollowTask that = (ShardFollowTask) o;
|
ShardFollowTask that = (ShardFollowTask) o;
|
||||||
return Objects.equals(leaderCluster, that.leaderCluster) &&
|
return Objects.equals(remoteCluster, that.remoteCluster) &&
|
||||||
Objects.equals(followShardId, that.followShardId) &&
|
Objects.equals(followShardId, that.followShardId) &&
|
||||||
Objects.equals(leaderShardId, that.leaderShardId) &&
|
Objects.equals(leaderShardId, that.leaderShardId) &&
|
||||||
maxBatchOperationCount == that.maxBatchOperationCount &&
|
maxBatchOperationCount == that.maxBatchOperationCount &&
|
||||||
|
@ -249,7 +249,7 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
|
||||||
@Override
|
@Override
|
||||||
public int hashCode() {
|
public int hashCode() {
|
||||||
return Objects.hash(
|
return Objects.hash(
|
||||||
leaderCluster,
|
remoteCluster,
|
||||||
followShardId,
|
followShardId,
|
||||||
leaderShardId,
|
leaderShardId,
|
||||||
maxBatchOperationCount,
|
maxBatchOperationCount,
|
||||||
|
|
|
@ -79,8 +79,8 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
|
||||||
Map<String, String> headers) {
|
Map<String, String> headers) {
|
||||||
ShardFollowTask params = taskInProgress.getParams();
|
ShardFollowTask params = taskInProgress.getParams();
|
||||||
final Client leaderClient;
|
final Client leaderClient;
|
||||||
if (params.getLeaderCluster() != null) {
|
if (params.getRemoteCluster() != null) {
|
||||||
leaderClient = wrapClient(client.getRemoteClusterClient(params.getLeaderCluster()), params.getHeaders());
|
leaderClient = wrapClient(client.getRemoteClusterClient(params.getRemoteCluster()), params.getHeaders());
|
||||||
} else {
|
} else {
|
||||||
leaderClient = wrapClient(client, params.getHeaders());
|
leaderClient = wrapClient(client, params.getHeaders());
|
||||||
}
|
}
|
||||||
|
|
|
@ -76,7 +76,7 @@ public class TransportPutAutoFollowPatternAction extends
|
||||||
listener.onFailure(LicenseUtils.newComplianceException("ccr"));
|
listener.onFailure(LicenseUtils.newComplianceException("ccr"));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
final Client leaderClient = client.getRemoteClusterClient(request.getLeaderCluster());
|
final Client leaderClient = client.getRemoteClusterClient(request.getRemoteCluster());
|
||||||
final ClusterStateRequest clusterStateRequest = new ClusterStateRequest();
|
final ClusterStateRequest clusterStateRequest = new ClusterStateRequest();
|
||||||
clusterStateRequest.clear();
|
clusterStateRequest.clear();
|
||||||
clusterStateRequest.metaData(true);
|
clusterStateRequest.metaData(true);
|
||||||
|
@ -93,7 +93,7 @@ public class TransportPutAutoFollowPatternAction extends
|
||||||
ActionListener.wrap(
|
ActionListener.wrap(
|
||||||
clusterStateResponse -> {
|
clusterStateResponse -> {
|
||||||
final ClusterState leaderClusterState = clusterStateResponse.getState();
|
final ClusterState leaderClusterState = clusterStateResponse.getState();
|
||||||
clusterService.submitStateUpdateTask("put-auto-follow-pattern-" + request.getLeaderCluster(),
|
clusterService.submitStateUpdateTask("put-auto-follow-pattern-" + request.getRemoteCluster(),
|
||||||
new AckedClusterStateUpdateTask<AcknowledgedResponse>(request, listener) {
|
new AckedClusterStateUpdateTask<AcknowledgedResponse>(request, listener) {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -157,7 +157,7 @@ public class TransportPutAutoFollowPatternAction extends
|
||||||
}
|
}
|
||||||
|
|
||||||
AutoFollowPattern autoFollowPattern = new AutoFollowPattern(
|
AutoFollowPattern autoFollowPattern = new AutoFollowPattern(
|
||||||
request.getLeaderCluster(),
|
request.getRemoteCluster(),
|
||||||
request.getLeaderIndexPatterns(),
|
request.getLeaderIndexPatterns(),
|
||||||
request.getFollowIndexNamePattern(),
|
request.getFollowIndexNamePattern(),
|
||||||
request.getMaxBatchOperationCount(),
|
request.getMaxBatchOperationCount(),
|
||||||
|
|
|
@ -95,22 +95,22 @@ public final class TransportPutFollowAction
|
||||||
listener.onFailure(LicenseUtils.newComplianceException("ccr"));
|
listener.onFailure(LicenseUtils.newComplianceException("ccr"));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
String leaderCluster = request.getLeaderCluster();
|
String remoteCluster = request.getRemoteCluster();
|
||||||
// Validates whether the leader cluster has been configured properly:
|
// Validates whether the leader cluster has been configured properly:
|
||||||
client.getRemoteClusterClient(leaderCluster);
|
client.getRemoteClusterClient(remoteCluster);
|
||||||
|
|
||||||
String leaderIndex = request.getLeaderIndex();
|
String leaderIndex = request.getLeaderIndex();
|
||||||
createFollowerIndexAndFollowRemoteIndex(request, leaderCluster, leaderIndex, listener);
|
createFollowerIndexAndFollowRemoteIndex(request, remoteCluster, leaderIndex, listener);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void createFollowerIndexAndFollowRemoteIndex(
|
private void createFollowerIndexAndFollowRemoteIndex(
|
||||||
final PutFollowAction.Request request,
|
final PutFollowAction.Request request,
|
||||||
final String leaderCluster,
|
final String remoteCluster,
|
||||||
final String leaderIndex,
|
final String leaderIndex,
|
||||||
final ActionListener<PutFollowAction.Response> listener) {
|
final ActionListener<PutFollowAction.Response> listener) {
|
||||||
ccrLicenseChecker.checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs(
|
ccrLicenseChecker.checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs(
|
||||||
client,
|
client,
|
||||||
leaderCluster,
|
remoteCluster,
|
||||||
leaderIndex,
|
leaderIndex,
|
||||||
listener::onFailure,
|
listener::onFailure,
|
||||||
(historyUUID, leaderIndexMetaData) -> createFollowerIndex(leaderIndexMetaData, historyUUID, request, listener));
|
(historyUUID, leaderIndexMetaData) -> createFollowerIndex(leaderIndexMetaData, historyUUID, request, listener));
|
||||||
|
@ -160,7 +160,7 @@ public final class TransportPutFollowAction
|
||||||
metadata.put(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_SHARD_HISTORY_UUIDS, String.join(",", historyUUIDs));
|
metadata.put(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_SHARD_HISTORY_UUIDS, String.join(",", historyUUIDs));
|
||||||
metadata.put(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY, leaderIndexMetaData.getIndexUUID());
|
metadata.put(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY, leaderIndexMetaData.getIndexUUID());
|
||||||
metadata.put(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_NAME_KEY, leaderIndexMetaData.getIndex().getName());
|
metadata.put(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_NAME_KEY, leaderIndexMetaData.getIndex().getName());
|
||||||
metadata.put(Ccr.CCR_CUSTOM_METADATA_LEADER_CLUSTER_NAME_KEY, request.getLeaderCluster());
|
metadata.put(Ccr.CCR_CUSTOM_METADATA_REMOTE_CLUSTER_NAME_KEY, request.getRemoteCluster());
|
||||||
imdBuilder.putCustom(Ccr.CCR_CUSTOM_METADATA_KEY, metadata);
|
imdBuilder.putCustom(Ccr.CCR_CUSTOM_METADATA_KEY, metadata);
|
||||||
|
|
||||||
// Copy all settings, but overwrite a few settings.
|
// Copy all settings, but overwrite a few settings.
|
||||||
|
|
|
@ -107,7 +107,7 @@ public class TransportResumeFollowAction extends HandledTransportAction<ResumeFo
|
||||||
if (ccrMetadata == null) {
|
if (ccrMetadata == null) {
|
||||||
throw new IllegalArgumentException("follow index ["+ request.getFollowerIndex() + "] does not have ccr metadata");
|
throw new IllegalArgumentException("follow index ["+ request.getFollowerIndex() + "] does not have ccr metadata");
|
||||||
}
|
}
|
||||||
final String leaderCluster = ccrMetadata.get(Ccr.CCR_CUSTOM_METADATA_LEADER_CLUSTER_NAME_KEY);
|
final String leaderCluster = ccrMetadata.get(Ccr.CCR_CUSTOM_METADATA_REMOTE_CLUSTER_NAME_KEY);
|
||||||
// Validates whether the leader cluster has been configured properly:
|
// Validates whether the leader cluster has been configured properly:
|
||||||
client.getRemoteClusterClient(leaderCluster);
|
client.getRemoteClusterClient(leaderCluster);
|
||||||
final String leaderIndex = ccrMetadata.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_NAME_KEY);
|
final String leaderIndex = ccrMetadata.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_NAME_KEY);
|
||||||
|
|
|
@ -74,7 +74,7 @@ public abstract class CcrSingleNodeTestCase extends ESSingleNodeTestCase {
|
||||||
|
|
||||||
protected PutFollowAction.Request getPutFollowRequest() {
|
protected PutFollowAction.Request getPutFollowRequest() {
|
||||||
PutFollowAction.Request request = new PutFollowAction.Request();
|
PutFollowAction.Request request = new PutFollowAction.Request();
|
||||||
request.setLeaderCluster("local");
|
request.setRemoteCluster("local");
|
||||||
request.setLeaderIndex("leader");
|
request.setLeaderIndex("leader");
|
||||||
request.setFollowRequest(getResumeFollowRequest());
|
request.setFollowRequest(getResumeFollowRequest());
|
||||||
return request;
|
return request;
|
||||||
|
|
|
@ -121,7 +121,7 @@ public class AutoFollowIT extends CcrIntegTestCase {
|
||||||
// Enabling auto following:
|
// Enabling auto following:
|
||||||
PutAutoFollowPatternAction.Request request = new PutAutoFollowPatternAction.Request();
|
PutAutoFollowPatternAction.Request request = new PutAutoFollowPatternAction.Request();
|
||||||
request.setName("my-pattern");
|
request.setName("my-pattern");
|
||||||
request.setLeaderCluster("leader_cluster");
|
request.setRemoteCluster("leader_cluster");
|
||||||
request.setLeaderIndexPatterns(Collections.singletonList("logs-*"));
|
request.setLeaderIndexPatterns(Collections.singletonList("logs-*"));
|
||||||
// Need to set this, because following an index in the same cluster
|
// Need to set this, because following an index in the same cluster
|
||||||
request.setFollowIndexNamePattern("copy-{{leader_index}}");
|
request.setFollowIndexNamePattern("copy-{{leader_index}}");
|
||||||
|
@ -228,7 +228,7 @@ public class AutoFollowIT extends CcrIntegTestCase {
|
||||||
private void putAutoFollowPatterns(String name, String[] patterns) {
|
private void putAutoFollowPatterns(String name, String[] patterns) {
|
||||||
PutAutoFollowPatternAction.Request request = new PutAutoFollowPatternAction.Request();
|
PutAutoFollowPatternAction.Request request = new PutAutoFollowPatternAction.Request();
|
||||||
request.setName(name);
|
request.setName(name);
|
||||||
request.setLeaderCluster("leader_cluster");
|
request.setRemoteCluster("leader_cluster");
|
||||||
request.setLeaderIndexPatterns(Arrays.asList(patterns));
|
request.setLeaderIndexPatterns(Arrays.asList(patterns));
|
||||||
// Need to set this, because following an index in the same cluster
|
// Need to set this, because following an index in the same cluster
|
||||||
request.setFollowIndexNamePattern("copy-{{leader_index}}");
|
request.setFollowIndexNamePattern("copy-{{leader_index}}");
|
||||||
|
|
|
@ -118,7 +118,7 @@ public class CcrLicenseIT extends CcrSingleNodeTestCase {
|
||||||
final CountDownLatch latch = new CountDownLatch(1);
|
final CountDownLatch latch = new CountDownLatch(1);
|
||||||
final PutAutoFollowPatternAction.Request request = new PutAutoFollowPatternAction.Request();
|
final PutAutoFollowPatternAction.Request request = new PutAutoFollowPatternAction.Request();
|
||||||
request.setName("name");
|
request.setName("name");
|
||||||
request.setLeaderCluster("leader");
|
request.setRemoteCluster("leader");
|
||||||
request.setLeaderIndexPatterns(Collections.singletonList("*"));
|
request.setLeaderIndexPatterns(Collections.singletonList("*"));
|
||||||
client().execute(
|
client().execute(
|
||||||
PutAutoFollowPatternAction.INSTANCE,
|
PutAutoFollowPatternAction.INSTANCE,
|
||||||
|
|
|
@ -640,13 +640,13 @@ public class IndexFollowingIT extends CcrIntegTestCase {
|
||||||
assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
|
assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
|
||||||
ensureLeaderGreen("index1");
|
ensureLeaderGreen("index1");
|
||||||
PutFollowAction.Request followRequest = putFollow("index1", "index2");
|
PutFollowAction.Request followRequest = putFollow("index1", "index2");
|
||||||
followRequest.setLeaderCluster("another_cluster");
|
followRequest.setRemoteCluster("another_cluster");
|
||||||
Exception e = expectThrows(IllegalArgumentException.class,
|
Exception e = expectThrows(IllegalArgumentException.class,
|
||||||
() -> followerClient().execute(PutFollowAction.INSTANCE, followRequest).actionGet());
|
() -> followerClient().execute(PutFollowAction.INSTANCE, followRequest).actionGet());
|
||||||
assertThat(e.getMessage(), equalTo("unknown cluster alias [another_cluster]"));
|
assertThat(e.getMessage(), equalTo("unknown cluster alias [another_cluster]"));
|
||||||
PutAutoFollowPatternAction.Request putAutoFollowRequest = new PutAutoFollowPatternAction.Request();
|
PutAutoFollowPatternAction.Request putAutoFollowRequest = new PutAutoFollowPatternAction.Request();
|
||||||
putAutoFollowRequest.setName("name");
|
putAutoFollowRequest.setName("name");
|
||||||
putAutoFollowRequest.setLeaderCluster("another_cluster");
|
putAutoFollowRequest.setRemoteCluster("another_cluster");
|
||||||
putAutoFollowRequest.setLeaderIndexPatterns(Collections.singletonList("logs-*"));
|
putAutoFollowRequest.setLeaderIndexPatterns(Collections.singletonList("logs-*"));
|
||||||
e = expectThrows(IllegalArgumentException.class,
|
e = expectThrows(IllegalArgumentException.class,
|
||||||
() -> followerClient().execute(PutAutoFollowPatternAction.INSTANCE, putAutoFollowRequest).actionGet());
|
() -> followerClient().execute(PutAutoFollowPatternAction.INSTANCE, putAutoFollowRequest).actionGet());
|
||||||
|
@ -961,7 +961,7 @@ public class IndexFollowingIT extends CcrIntegTestCase {
|
||||||
|
|
||||||
public static PutFollowAction.Request putFollow(String leaderIndex, String followerIndex) {
|
public static PutFollowAction.Request putFollow(String leaderIndex, String followerIndex) {
|
||||||
PutFollowAction.Request request = new PutFollowAction.Request();
|
PutFollowAction.Request request = new PutFollowAction.Request();
|
||||||
request.setLeaderCluster("leader_cluster");
|
request.setRemoteCluster("leader_cluster");
|
||||||
request.setLeaderIndex(leaderIndex);
|
request.setLeaderIndex(leaderIndex);
|
||||||
request.setFollowRequest(resumeFollow(followerIndex));
|
request.setFollowRequest(resumeFollow(followerIndex));
|
||||||
return request;
|
return request;
|
||||||
|
|
|
@ -83,7 +83,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
||||||
AutoFollower autoFollower = new AutoFollower(handler, currentState) {
|
AutoFollower autoFollower = new AutoFollower(handler, currentState) {
|
||||||
@Override
|
@Override
|
||||||
void getLeaderClusterState(Map<String, String> headers,
|
void getLeaderClusterState(Map<String, String> headers,
|
||||||
String leaderClusterAlias,
|
String remoteCluster,
|
||||||
BiConsumer<ClusterState, Exception> handler) {
|
BiConsumer<ClusterState, Exception> handler) {
|
||||||
assertThat(headers, equalTo(autoFollowHeaders.get("remote")));
|
assertThat(headers, equalTo(autoFollowHeaders.get("remote")));
|
||||||
handler.accept(leaderState, null);
|
handler.accept(leaderState, null);
|
||||||
|
@ -95,7 +95,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
||||||
Runnable successHandler,
|
Runnable successHandler,
|
||||||
Consumer<Exception> failureHandler) {
|
Consumer<Exception> failureHandler) {
|
||||||
assertThat(headers, equalTo(autoFollowHeaders.get("remote")));
|
assertThat(headers, equalTo(autoFollowHeaders.get("remote")));
|
||||||
assertThat(followRequest.getLeaderCluster(), equalTo("remote"));
|
assertThat(followRequest.getRemoteCluster(), equalTo("remote"));
|
||||||
assertThat(followRequest.getLeaderIndex(), equalTo("logs-20190101"));
|
assertThat(followRequest.getLeaderIndex(), equalTo("logs-20190101"));
|
||||||
assertThat(followRequest.getFollowRequest().getFollowerIndex(), equalTo("logs-20190101"));
|
assertThat(followRequest.getFollowRequest().getFollowerIndex(), equalTo("logs-20190101"));
|
||||||
successHandler.run();
|
successHandler.run();
|
||||||
|
@ -143,7 +143,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
||||||
AutoFollower autoFollower = new AutoFollower(handler, followerState) {
|
AutoFollower autoFollower = new AutoFollower(handler, followerState) {
|
||||||
@Override
|
@Override
|
||||||
void getLeaderClusterState(Map<String, String> headers,
|
void getLeaderClusterState(Map<String, String> headers,
|
||||||
String leaderClusterAlias,
|
String remoteCluster,
|
||||||
BiConsumer<ClusterState, Exception> handler) {
|
BiConsumer<ClusterState, Exception> handler) {
|
||||||
handler.accept(null, failure);
|
handler.accept(null, failure);
|
||||||
}
|
}
|
||||||
|
@ -204,7 +204,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
||||||
AutoFollower autoFollower = new AutoFollower(handler, followerState) {
|
AutoFollower autoFollower = new AutoFollower(handler, followerState) {
|
||||||
@Override
|
@Override
|
||||||
void getLeaderClusterState(Map<String, String> headers,
|
void getLeaderClusterState(Map<String, String> headers,
|
||||||
String leaderClusterAlias,
|
String remoteCluster,
|
||||||
BiConsumer<ClusterState, Exception> handler) {
|
BiConsumer<ClusterState, Exception> handler) {
|
||||||
handler.accept(leaderState, null);
|
handler.accept(leaderState, null);
|
||||||
}
|
}
|
||||||
|
@ -214,7 +214,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
||||||
PutFollowAction.Request followRequest,
|
PutFollowAction.Request followRequest,
|
||||||
Runnable successHandler,
|
Runnable successHandler,
|
||||||
Consumer<Exception> failureHandler) {
|
Consumer<Exception> failureHandler) {
|
||||||
assertThat(followRequest.getLeaderCluster(), equalTo("remote"));
|
assertThat(followRequest.getRemoteCluster(), equalTo("remote"));
|
||||||
assertThat(followRequest.getLeaderIndex(), equalTo("logs-20190101"));
|
assertThat(followRequest.getLeaderIndex(), equalTo("logs-20190101"));
|
||||||
assertThat(followRequest.getFollowRequest().getFollowerIndex(), equalTo("logs-20190101"));
|
assertThat(followRequest.getFollowRequest().getFollowerIndex(), equalTo("logs-20190101"));
|
||||||
successHandler.run();
|
successHandler.run();
|
||||||
|
@ -267,7 +267,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
||||||
AutoFollower autoFollower = new AutoFollower(handler, followerState) {
|
AutoFollower autoFollower = new AutoFollower(handler, followerState) {
|
||||||
@Override
|
@Override
|
||||||
void getLeaderClusterState(Map<String, String> headers,
|
void getLeaderClusterState(Map<String, String> headers,
|
||||||
String leaderClusterAlias,
|
String remoteCluster,
|
||||||
BiConsumer<ClusterState, Exception> handler) {
|
BiConsumer<ClusterState, Exception> handler) {
|
||||||
handler.accept(leaderState, null);
|
handler.accept(leaderState, null);
|
||||||
}
|
}
|
||||||
|
@ -277,7 +277,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
||||||
PutFollowAction.Request followRequest,
|
PutFollowAction.Request followRequest,
|
||||||
Runnable successHandler,
|
Runnable successHandler,
|
||||||
Consumer<Exception> failureHandler) {
|
Consumer<Exception> failureHandler) {
|
||||||
assertThat(followRequest.getLeaderCluster(), equalTo("remote"));
|
assertThat(followRequest.getRemoteCluster(), equalTo("remote"));
|
||||||
assertThat(followRequest.getLeaderIndex(), equalTo("logs-20190101"));
|
assertThat(followRequest.getLeaderIndex(), equalTo("logs-20190101"));
|
||||||
assertThat(followRequest.getFollowRequest().getFollowerIndex(), equalTo("logs-20190101"));
|
assertThat(followRequest.getFollowRequest().getFollowerIndex(), equalTo("logs-20190101"));
|
||||||
failureHandler.accept(failure);
|
failureHandler.accept(failure);
|
||||||
|
|
|
@ -42,7 +42,7 @@ public class PutAutoFollowPatternRequestTests extends AbstractStreamableXContent
|
||||||
protected PutAutoFollowPatternAction.Request createTestInstance() {
|
protected PutAutoFollowPatternAction.Request createTestInstance() {
|
||||||
PutAutoFollowPatternAction.Request request = new PutAutoFollowPatternAction.Request();
|
PutAutoFollowPatternAction.Request request = new PutAutoFollowPatternAction.Request();
|
||||||
request.setName(randomAlphaOfLength(4));
|
request.setName(randomAlphaOfLength(4));
|
||||||
request.setLeaderCluster(randomAlphaOfLength(4));
|
request.setRemoteCluster(randomAlphaOfLength(4));
|
||||||
request.setLeaderIndexPatterns(Arrays.asList(generateRandomStringArray(4, 4, false)));
|
request.setLeaderIndexPatterns(Arrays.asList(generateRandomStringArray(4, 4, false)));
|
||||||
if (randomBoolean()) {
|
if (randomBoolean()) {
|
||||||
request.setFollowIndexNamePattern(randomAlphaOfLength(4));
|
request.setFollowIndexNamePattern(randomAlphaOfLength(4));
|
||||||
|
@ -80,9 +80,9 @@ public class PutAutoFollowPatternRequestTests extends AbstractStreamableXContent
|
||||||
request.setName("name");
|
request.setName("name");
|
||||||
validationException = request.validate();
|
validationException = request.validate();
|
||||||
assertThat(validationException, notNullValue());
|
assertThat(validationException, notNullValue());
|
||||||
assertThat(validationException.getMessage(), containsString("[leader_cluster] is missing"));
|
assertThat(validationException.getMessage(), containsString("[remote_cluster] is missing"));
|
||||||
|
|
||||||
request.setLeaderCluster("_alias");
|
request.setRemoteCluster("_alias");
|
||||||
validationException = request.validate();
|
validationException = request.validate();
|
||||||
assertThat(validationException, notNullValue());
|
assertThat(validationException, notNullValue());
|
||||||
assertThat(validationException.getMessage(), containsString("[leader_index_patterns] is missing"));
|
assertThat(validationException.getMessage(), containsString("[leader_index_patterns] is missing"));
|
||||||
|
|
|
@ -21,7 +21,7 @@ public class PutFollowActionRequestTests extends AbstractStreamableXContentTestC
|
||||||
@Override
|
@Override
|
||||||
protected PutFollowAction.Request createTestInstance() {
|
protected PutFollowAction.Request createTestInstance() {
|
||||||
PutFollowAction.Request request = new PutFollowAction.Request();
|
PutFollowAction.Request request = new PutFollowAction.Request();
|
||||||
request.setLeaderCluster(randomAlphaOfLength(4));
|
request.setRemoteCluster(randomAlphaOfLength(4));
|
||||||
request.setLeaderIndex(randomAlphaOfLength(4));
|
request.setLeaderIndex(randomAlphaOfLength(4));
|
||||||
request.setFollowRequest(ResumeFollowActionRequestTests.createTestRequest());
|
request.setFollowRequest(ResumeFollowActionRequestTests.createTestRequest());
|
||||||
return request;
|
return request;
|
||||||
|
|
|
@ -65,7 +65,7 @@ public class ShardFollowNodeTaskStatusTests extends AbstractSerializingTestCase<
|
||||||
@Override
|
@Override
|
||||||
protected void assertEqualInstances(final ShardFollowNodeTaskStatus expectedInstance, final ShardFollowNodeTaskStatus newInstance) {
|
protected void assertEqualInstances(final ShardFollowNodeTaskStatus expectedInstance, final ShardFollowNodeTaskStatus newInstance) {
|
||||||
assertNotSame(expectedInstance, newInstance);
|
assertNotSame(expectedInstance, newInstance);
|
||||||
assertThat(newInstance.getLeaderCluster(), equalTo(expectedInstance.getLeaderCluster()));
|
assertThat(newInstance.getRemoteCluster(), equalTo(expectedInstance.getRemoteCluster()));
|
||||||
assertThat(newInstance.leaderIndex(), equalTo(expectedInstance.leaderIndex()));
|
assertThat(newInstance.leaderIndex(), equalTo(expectedInstance.leaderIndex()));
|
||||||
assertThat(newInstance.followerIndex(), equalTo(expectedInstance.followerIndex()));
|
assertThat(newInstance.followerIndex(), equalTo(expectedInstance.followerIndex()));
|
||||||
assertThat(newInstance.getShardId(), equalTo(expectedInstance.getShardId()));
|
assertThat(newInstance.getShardId(), equalTo(expectedInstance.getShardId()));
|
||||||
|
|
|
@ -63,7 +63,7 @@ public class TransportDeleteAutoFollowPatternActionTests extends ESTestCase {
|
||||||
.custom(AutoFollowMetadata.TYPE);
|
.custom(AutoFollowMetadata.TYPE);
|
||||||
assertThat(result.getPatterns().size(), equalTo(1));
|
assertThat(result.getPatterns().size(), equalTo(1));
|
||||||
assertThat(result.getPatterns().get("name2"), notNullValue());
|
assertThat(result.getPatterns().get("name2"), notNullValue());
|
||||||
assertThat(result.getPatterns().get("name2").getLeaderCluster(), equalTo("asia_cluster"));
|
assertThat(result.getPatterns().get("name2").getRemoteCluster(), equalTo("asia_cluster"));
|
||||||
assertThat(result.getFollowedLeaderIndexUUIDs().size(), equalTo(1));
|
assertThat(result.getFollowedLeaderIndexUUIDs().size(), equalTo(1));
|
||||||
assertThat(result.getFollowedLeaderIndexUUIDs().get("name2"), notNullValue());
|
assertThat(result.getFollowedLeaderIndexUUIDs().get("name2"), notNullValue());
|
||||||
assertThat(result.getHeaders().size(), equalTo(1));
|
assertThat(result.getHeaders().size(), equalTo(1));
|
||||||
|
|
|
@ -30,7 +30,7 @@ public class TransportPutAutoFollowPatternActionTests extends ESTestCase {
|
||||||
public void testInnerPut() {
|
public void testInnerPut() {
|
||||||
PutAutoFollowPatternAction.Request request = new PutAutoFollowPatternAction.Request();
|
PutAutoFollowPatternAction.Request request = new PutAutoFollowPatternAction.Request();
|
||||||
request.setName("name1");
|
request.setName("name1");
|
||||||
request.setLeaderCluster("eu_cluster");
|
request.setRemoteCluster("eu_cluster");
|
||||||
request.setLeaderIndexPatterns(Collections.singletonList("logs-*"));
|
request.setLeaderIndexPatterns(Collections.singletonList("logs-*"));
|
||||||
|
|
||||||
ClusterState localState = ClusterState.builder(new ClusterName("us_cluster"))
|
ClusterState localState = ClusterState.builder(new ClusterName("us_cluster"))
|
||||||
|
@ -45,7 +45,7 @@ public class TransportPutAutoFollowPatternActionTests extends ESTestCase {
|
||||||
AutoFollowMetadata autoFollowMetadata = result.metaData().custom(AutoFollowMetadata.TYPE);
|
AutoFollowMetadata autoFollowMetadata = result.metaData().custom(AutoFollowMetadata.TYPE);
|
||||||
assertThat(autoFollowMetadata, notNullValue());
|
assertThat(autoFollowMetadata, notNullValue());
|
||||||
assertThat(autoFollowMetadata.getPatterns().size(), equalTo(1));
|
assertThat(autoFollowMetadata.getPatterns().size(), equalTo(1));
|
||||||
assertThat(autoFollowMetadata.getPatterns().get("name1").getLeaderCluster(), equalTo("eu_cluster"));
|
assertThat(autoFollowMetadata.getPatterns().get("name1").getRemoteCluster(), equalTo("eu_cluster"));
|
||||||
assertThat(autoFollowMetadata.getPatterns().get("name1").getLeaderIndexPatterns().size(), equalTo(1));
|
assertThat(autoFollowMetadata.getPatterns().get("name1").getLeaderIndexPatterns().size(), equalTo(1));
|
||||||
assertThat(autoFollowMetadata.getPatterns().get("name1").getLeaderIndexPatterns().get(0), equalTo("logs-*"));
|
assertThat(autoFollowMetadata.getPatterns().get("name1").getLeaderIndexPatterns().get(0), equalTo("logs-*"));
|
||||||
assertThat(autoFollowMetadata.getFollowedLeaderIndexUUIDs().size(), equalTo(1));
|
assertThat(autoFollowMetadata.getFollowedLeaderIndexUUIDs().size(), equalTo(1));
|
||||||
|
@ -55,7 +55,7 @@ public class TransportPutAutoFollowPatternActionTests extends ESTestCase {
|
||||||
public void testInnerPut_existingLeaderIndices() {
|
public void testInnerPut_existingLeaderIndices() {
|
||||||
PutAutoFollowPatternAction.Request request = new PutAutoFollowPatternAction.Request();
|
PutAutoFollowPatternAction.Request request = new PutAutoFollowPatternAction.Request();
|
||||||
request.setName("name1");
|
request.setName("name1");
|
||||||
request.setLeaderCluster("eu_cluster");
|
request.setRemoteCluster("eu_cluster");
|
||||||
request.setLeaderIndexPatterns(Collections.singletonList("logs-*"));
|
request.setLeaderIndexPatterns(Collections.singletonList("logs-*"));
|
||||||
|
|
||||||
ClusterState localState = ClusterState.builder(new ClusterName("us_cluster"))
|
ClusterState localState = ClusterState.builder(new ClusterName("us_cluster"))
|
||||||
|
@ -86,7 +86,7 @@ public class TransportPutAutoFollowPatternActionTests extends ESTestCase {
|
||||||
AutoFollowMetadata autoFollowMetadata = result.metaData().custom(AutoFollowMetadata.TYPE);
|
AutoFollowMetadata autoFollowMetadata = result.metaData().custom(AutoFollowMetadata.TYPE);
|
||||||
assertThat(autoFollowMetadata, notNullValue());
|
assertThat(autoFollowMetadata, notNullValue());
|
||||||
assertThat(autoFollowMetadata.getPatterns().size(), equalTo(1));
|
assertThat(autoFollowMetadata.getPatterns().size(), equalTo(1));
|
||||||
assertThat(autoFollowMetadata.getPatterns().get("name1").getLeaderCluster(), equalTo("eu_cluster"));
|
assertThat(autoFollowMetadata.getPatterns().get("name1").getRemoteCluster(), equalTo("eu_cluster"));
|
||||||
assertThat(autoFollowMetadata.getPatterns().get("name1").getLeaderIndexPatterns().size(), equalTo(1));
|
assertThat(autoFollowMetadata.getPatterns().get("name1").getLeaderIndexPatterns().size(), equalTo(1));
|
||||||
assertThat(autoFollowMetadata.getPatterns().get("name1").getLeaderIndexPatterns().get(0), equalTo("logs-*"));
|
assertThat(autoFollowMetadata.getPatterns().get("name1").getLeaderIndexPatterns().get(0), equalTo("logs-*"));
|
||||||
assertThat(autoFollowMetadata.getFollowedLeaderIndexUUIDs().size(), equalTo(1));
|
assertThat(autoFollowMetadata.getFollowedLeaderIndexUUIDs().size(), equalTo(1));
|
||||||
|
@ -96,7 +96,7 @@ public class TransportPutAutoFollowPatternActionTests extends ESTestCase {
|
||||||
public void testInnerPut_existingLeaderIndicesAndAutoFollowMetadata() {
|
public void testInnerPut_existingLeaderIndicesAndAutoFollowMetadata() {
|
||||||
PutAutoFollowPatternAction.Request request = new PutAutoFollowPatternAction.Request();
|
PutAutoFollowPatternAction.Request request = new PutAutoFollowPatternAction.Request();
|
||||||
request.setName("name1");
|
request.setName("name1");
|
||||||
request.setLeaderCluster("eu_cluster");
|
request.setRemoteCluster("eu_cluster");
|
||||||
request.setLeaderIndexPatterns(Arrays.asList("logs-*", "transactions-*"));
|
request.setLeaderIndexPatterns(Arrays.asList("logs-*", "transactions-*"));
|
||||||
|
|
||||||
Map<String, AutoFollowPattern> existingAutoFollowPatterns = new HashMap<>();
|
Map<String, AutoFollowPattern> existingAutoFollowPatterns = new HashMap<>();
|
||||||
|
@ -133,7 +133,7 @@ public class TransportPutAutoFollowPatternActionTests extends ESTestCase {
|
||||||
AutoFollowMetadata autoFollowMetadata = result.metaData().custom(AutoFollowMetadata.TYPE);
|
AutoFollowMetadata autoFollowMetadata = result.metaData().custom(AutoFollowMetadata.TYPE);
|
||||||
assertThat(autoFollowMetadata, notNullValue());
|
assertThat(autoFollowMetadata, notNullValue());
|
||||||
assertThat(autoFollowMetadata.getPatterns().size(), equalTo(1));
|
assertThat(autoFollowMetadata.getPatterns().size(), equalTo(1));
|
||||||
assertThat(autoFollowMetadata.getPatterns().get("name1").getLeaderCluster(), equalTo("eu_cluster"));
|
assertThat(autoFollowMetadata.getPatterns().get("name1").getRemoteCluster(), equalTo("eu_cluster"));
|
||||||
assertThat(autoFollowMetadata.getPatterns().get("name1").getLeaderIndexPatterns().size(), equalTo(2));
|
assertThat(autoFollowMetadata.getPatterns().get("name1").getLeaderIndexPatterns().size(), equalTo(2));
|
||||||
assertThat(autoFollowMetadata.getPatterns().get("name1").getLeaderIndexPatterns().get(0), equalTo("logs-*"));
|
assertThat(autoFollowMetadata.getPatterns().get("name1").getLeaderIndexPatterns().get(0), equalTo("logs-*"));
|
||||||
assertThat(autoFollowMetadata.getPatterns().get("name1").getLeaderIndexPatterns().get(1), equalTo("transactions-*"));
|
assertThat(autoFollowMetadata.getPatterns().get("name1").getLeaderIndexPatterns().get(1), equalTo("transactions-*"));
|
||||||
|
|
|
@ -154,7 +154,7 @@ public class FollowStatsMonitoringDocTests extends BaseMonitoringDocTestCase<Fol
|
||||||
+ "\"timestamp\":\"" + new DateTime(nodeTimestamp, DateTimeZone.UTC).toString() + "\""
|
+ "\"timestamp\":\"" + new DateTime(nodeTimestamp, DateTimeZone.UTC).toString() + "\""
|
||||||
+ "},"
|
+ "},"
|
||||||
+ "\"ccr_stats\":{"
|
+ "\"ccr_stats\":{"
|
||||||
+ "\"leader_cluster\":\"leader_cluster\","
|
+ "\"remote_cluster\":\"leader_cluster\","
|
||||||
+ "\"leader_index\":\"leader_index\","
|
+ "\"leader_index\":\"leader_index\","
|
||||||
+ "\"follower_index\":\"follower_index\","
|
+ "\"follower_index\":\"follower_index\","
|
||||||
+ "\"shard_id\":" + shardId + ","
|
+ "\"shard_id\":" + shardId + ","
|
||||||
|
@ -168,7 +168,7 @@ public class FollowStatsMonitoringDocTests extends BaseMonitoringDocTestCase<Fol
|
||||||
+ "\"number_of_queued_writes\":" + numberOfQueuedWrites + ","
|
+ "\"number_of_queued_writes\":" + numberOfQueuedWrites + ","
|
||||||
+ "\"mapping_version\":" + mappingVersion + ","
|
+ "\"mapping_version\":" + mappingVersion + ","
|
||||||
+ "\"total_fetch_time_millis\":" + totalFetchTimeMillis + ","
|
+ "\"total_fetch_time_millis\":" + totalFetchTimeMillis + ","
|
||||||
+ "\"total_fetch_leader_time_millis\":" + totalFetchTookTimeMillis + ","
|
+ "\"total_fetch_remote_time_millis\":" + totalFetchTookTimeMillis + ","
|
||||||
+ "\"number_of_successful_fetches\":" + numberOfSuccessfulFetches + ","
|
+ "\"number_of_successful_fetches\":" + numberOfSuccessfulFetches + ","
|
||||||
+ "\"number_of_failed_fetches\":" + numberOfFailedFetches + ","
|
+ "\"number_of_failed_fetches\":" + numberOfFailedFetches + ","
|
||||||
+ "\"operations_received\":" + operationsReceived + ","
|
+ "\"operations_received\":" + operationsReceived + ","
|
||||||
|
@ -197,7 +197,7 @@ public class FollowStatsMonitoringDocTests extends BaseMonitoringDocTestCase<Fol
|
||||||
final NavigableMap<Long, Tuple<Integer, ElasticsearchException>> fetchExceptions =
|
final NavigableMap<Long, Tuple<Integer, ElasticsearchException>> fetchExceptions =
|
||||||
new TreeMap<>(Collections.singletonMap(1L, Tuple.tuple(2, new ElasticsearchException("shard is sad"))));
|
new TreeMap<>(Collections.singletonMap(1L, Tuple.tuple(2, new ElasticsearchException("shard is sad"))));
|
||||||
final ShardFollowNodeTaskStatus status = new ShardFollowNodeTaskStatus(
|
final ShardFollowNodeTaskStatus status = new ShardFollowNodeTaskStatus(
|
||||||
"leader_cluster",
|
"remote_cluster",
|
||||||
"leader_index",
|
"leader_index",
|
||||||
"follower_index",
|
"follower_index",
|
||||||
0,
|
0,
|
||||||
|
|
|
@ -175,7 +175,7 @@ public class AutoFollowMetadata extends AbstractNamedDiffable<MetaData.Custom> i
|
||||||
|
|
||||||
public static class AutoFollowPattern implements Writeable, ToXContentObject {
|
public static class AutoFollowPattern implements Writeable, ToXContentObject {
|
||||||
|
|
||||||
public static final ParseField LEADER_CLUSTER_FIELD = new ParseField("leader_cluster");
|
public static final ParseField REMOTE_CLUSTER_FIELD = new ParseField("remote_cluster");
|
||||||
public static final ParseField LEADER_PATTERNS_FIELD = new ParseField("leader_index_patterns");
|
public static final ParseField LEADER_PATTERNS_FIELD = new ParseField("leader_index_patterns");
|
||||||
public static final ParseField FOLLOW_PATTERN_FIELD = new ParseField("follow_index_pattern");
|
public static final ParseField FOLLOW_PATTERN_FIELD = new ParseField("follow_index_pattern");
|
||||||
public static final ParseField MAX_BATCH_OPERATION_COUNT = new ParseField("max_batch_operation_count");
|
public static final ParseField MAX_BATCH_OPERATION_COUNT = new ParseField("max_batch_operation_count");
|
||||||
|
@ -194,7 +194,7 @@ public class AutoFollowMetadata extends AbstractNamedDiffable<MetaData.Custom> i
|
||||||
(TimeValue) args[9]));
|
(TimeValue) args[9]));
|
||||||
|
|
||||||
static {
|
static {
|
||||||
PARSER.declareString(ConstructingObjectParser.constructorArg(), LEADER_CLUSTER_FIELD);
|
PARSER.declareString(ConstructingObjectParser.constructorArg(), REMOTE_CLUSTER_FIELD);
|
||||||
PARSER.declareStringArray(ConstructingObjectParser.constructorArg(), LEADER_PATTERNS_FIELD);
|
PARSER.declareStringArray(ConstructingObjectParser.constructorArg(), LEADER_PATTERNS_FIELD);
|
||||||
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), FOLLOW_PATTERN_FIELD);
|
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), FOLLOW_PATTERN_FIELD);
|
||||||
PARSER.declareInt(ConstructingObjectParser.optionalConstructorArg(), MAX_BATCH_OPERATION_COUNT);
|
PARSER.declareInt(ConstructingObjectParser.optionalConstructorArg(), MAX_BATCH_OPERATION_COUNT);
|
||||||
|
@ -214,7 +214,7 @@ public class AutoFollowMetadata extends AbstractNamedDiffable<MetaData.Custom> i
|
||||||
POLL_TIMEOUT, ObjectParser.ValueType.STRING);
|
POLL_TIMEOUT, ObjectParser.ValueType.STRING);
|
||||||
}
|
}
|
||||||
|
|
||||||
private final String leaderCluster;
|
private final String remoteCluster;
|
||||||
private final List<String> leaderIndexPatterns;
|
private final List<String> leaderIndexPatterns;
|
||||||
private final String followIndexPattern;
|
private final String followIndexPattern;
|
||||||
private final Integer maxBatchOperationCount;
|
private final Integer maxBatchOperationCount;
|
||||||
|
@ -225,7 +225,7 @@ public class AutoFollowMetadata extends AbstractNamedDiffable<MetaData.Custom> i
|
||||||
private final TimeValue maxRetryDelay;
|
private final TimeValue maxRetryDelay;
|
||||||
private final TimeValue pollTimeout;
|
private final TimeValue pollTimeout;
|
||||||
|
|
||||||
public AutoFollowPattern(String leaderCluster,
|
public AutoFollowPattern(String remoteCluster,
|
||||||
List<String> leaderIndexPatterns,
|
List<String> leaderIndexPatterns,
|
||||||
String followIndexPattern,
|
String followIndexPattern,
|
||||||
Integer maxBatchOperationCount,
|
Integer maxBatchOperationCount,
|
||||||
|
@ -235,7 +235,7 @@ public class AutoFollowMetadata extends AbstractNamedDiffable<MetaData.Custom> i
|
||||||
Integer maxWriteBufferSize,
|
Integer maxWriteBufferSize,
|
||||||
TimeValue maxRetryDelay,
|
TimeValue maxRetryDelay,
|
||||||
TimeValue pollTimeout) {
|
TimeValue pollTimeout) {
|
||||||
this.leaderCluster = leaderCluster;
|
this.remoteCluster = remoteCluster;
|
||||||
this.leaderIndexPatterns = leaderIndexPatterns;
|
this.leaderIndexPatterns = leaderIndexPatterns;
|
||||||
this.followIndexPattern = followIndexPattern;
|
this.followIndexPattern = followIndexPattern;
|
||||||
this.maxBatchOperationCount = maxBatchOperationCount;
|
this.maxBatchOperationCount = maxBatchOperationCount;
|
||||||
|
@ -248,7 +248,7 @@ public class AutoFollowMetadata extends AbstractNamedDiffable<MetaData.Custom> i
|
||||||
}
|
}
|
||||||
|
|
||||||
public AutoFollowPattern(StreamInput in) throws IOException {
|
public AutoFollowPattern(StreamInput in) throws IOException {
|
||||||
leaderCluster = in.readString();
|
remoteCluster = in.readString();
|
||||||
leaderIndexPatterns = in.readList(StreamInput::readString);
|
leaderIndexPatterns = in.readList(StreamInput::readString);
|
||||||
followIndexPattern = in.readOptionalString();
|
followIndexPattern = in.readOptionalString();
|
||||||
maxBatchOperationCount = in.readOptionalVInt();
|
maxBatchOperationCount = in.readOptionalVInt();
|
||||||
|
@ -268,8 +268,8 @@ public class AutoFollowMetadata extends AbstractNamedDiffable<MetaData.Custom> i
|
||||||
return Regex.simpleMatch(leaderIndexPatterns, indexName);
|
return Regex.simpleMatch(leaderIndexPatterns, indexName);
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getLeaderCluster() {
|
public String getRemoteCluster() {
|
||||||
return leaderCluster;
|
return remoteCluster;
|
||||||
}
|
}
|
||||||
|
|
||||||
public List<String> getLeaderIndexPatterns() {
|
public List<String> getLeaderIndexPatterns() {
|
||||||
|
@ -310,7 +310,7 @@ public class AutoFollowMetadata extends AbstractNamedDiffable<MetaData.Custom> i
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void writeTo(StreamOutput out) throws IOException {
|
public void writeTo(StreamOutput out) throws IOException {
|
||||||
out.writeString(leaderCluster);
|
out.writeString(remoteCluster);
|
||||||
out.writeStringList(leaderIndexPatterns);
|
out.writeStringList(leaderIndexPatterns);
|
||||||
out.writeOptionalString(followIndexPattern);
|
out.writeOptionalString(followIndexPattern);
|
||||||
out.writeOptionalVInt(maxBatchOperationCount);
|
out.writeOptionalVInt(maxBatchOperationCount);
|
||||||
|
@ -324,7 +324,7 @@ public class AutoFollowMetadata extends AbstractNamedDiffable<MetaData.Custom> i
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||||
builder.field(LEADER_CLUSTER_FIELD.getPreferredName(), leaderCluster);
|
builder.field(REMOTE_CLUSTER_FIELD.getPreferredName(), remoteCluster);
|
||||||
builder.array(LEADER_PATTERNS_FIELD.getPreferredName(), leaderIndexPatterns.toArray(new String[0]));
|
builder.array(LEADER_PATTERNS_FIELD.getPreferredName(), leaderIndexPatterns.toArray(new String[0]));
|
||||||
if (followIndexPattern != null) {
|
if (followIndexPattern != null) {
|
||||||
builder.field(FOLLOW_PATTERN_FIELD.getPreferredName(), followIndexPattern);
|
builder.field(FOLLOW_PATTERN_FIELD.getPreferredName(), followIndexPattern);
|
||||||
|
@ -363,7 +363,7 @@ public class AutoFollowMetadata extends AbstractNamedDiffable<MetaData.Custom> i
|
||||||
if (this == o) return true;
|
if (this == o) return true;
|
||||||
if (o == null || getClass() != o.getClass()) return false;
|
if (o == null || getClass() != o.getClass()) return false;
|
||||||
AutoFollowPattern that = (AutoFollowPattern) o;
|
AutoFollowPattern that = (AutoFollowPattern) o;
|
||||||
return Objects.equals(leaderCluster, that.leaderCluster) &&
|
return Objects.equals(remoteCluster, that.remoteCluster) &&
|
||||||
Objects.equals(leaderIndexPatterns, that.leaderIndexPatterns) &&
|
Objects.equals(leaderIndexPatterns, that.leaderIndexPatterns) &&
|
||||||
Objects.equals(followIndexPattern, that.followIndexPattern) &&
|
Objects.equals(followIndexPattern, that.followIndexPattern) &&
|
||||||
Objects.equals(maxBatchOperationCount, that.maxBatchOperationCount) &&
|
Objects.equals(maxBatchOperationCount, that.maxBatchOperationCount) &&
|
||||||
|
@ -378,7 +378,7 @@ public class AutoFollowMetadata extends AbstractNamedDiffable<MetaData.Custom> i
|
||||||
@Override
|
@Override
|
||||||
public int hashCode() {
|
public int hashCode() {
|
||||||
return Objects.hash(
|
return Objects.hash(
|
||||||
leaderCluster,
|
remoteCluster,
|
||||||
leaderIndexPatterns,
|
leaderIndexPatterns,
|
||||||
followIndexPattern,
|
followIndexPattern,
|
||||||
maxBatchOperationCount,
|
maxBatchOperationCount,
|
||||||
|
|
|
@ -34,7 +34,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
|
||||||
|
|
||||||
public static final String STATUS_PARSER_NAME = "shard-follow-node-task-status";
|
public static final String STATUS_PARSER_NAME = "shard-follow-node-task-status";
|
||||||
|
|
||||||
private static final ParseField LEADER_CLUSTER = new ParseField("leader_cluster");
|
private static final ParseField LEADER_CLUSTER = new ParseField("remote_cluster");
|
||||||
private static final ParseField LEADER_INDEX = new ParseField("leader_index");
|
private static final ParseField LEADER_INDEX = new ParseField("leader_index");
|
||||||
private static final ParseField FOLLOWER_INDEX = new ParseField("follower_index");
|
private static final ParseField FOLLOWER_INDEX = new ParseField("follower_index");
|
||||||
private static final ParseField SHARD_ID = new ParseField("shard_id");
|
private static final ParseField SHARD_ID = new ParseField("shard_id");
|
||||||
|
@ -48,7 +48,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
|
||||||
private static final ParseField NUMBER_OF_QUEUED_WRITES_FIELD = new ParseField("number_of_queued_writes");
|
private static final ParseField NUMBER_OF_QUEUED_WRITES_FIELD = new ParseField("number_of_queued_writes");
|
||||||
private static final ParseField MAPPING_VERSION_FIELD = new ParseField("mapping_version");
|
private static final ParseField MAPPING_VERSION_FIELD = new ParseField("mapping_version");
|
||||||
private static final ParseField TOTAL_FETCH_TIME_MILLIS_FIELD = new ParseField("total_fetch_time_millis");
|
private static final ParseField TOTAL_FETCH_TIME_MILLIS_FIELD = new ParseField("total_fetch_time_millis");
|
||||||
private static final ParseField TOTAL_FETCH_LEADER_TIME_MILLIS_FIELD = new ParseField("total_fetch_leader_time_millis");
|
private static final ParseField TOTAL_FETCH_REMOTE_TIME_MILLIS_FIELD = new ParseField("total_fetch_remote_time_millis");
|
||||||
private static final ParseField NUMBER_OF_SUCCESSFUL_FETCHES_FIELD = new ParseField("number_of_successful_fetches");
|
private static final ParseField NUMBER_OF_SUCCESSFUL_FETCHES_FIELD = new ParseField("number_of_successful_fetches");
|
||||||
private static final ParseField NUMBER_OF_FAILED_FETCHES_FIELD = new ParseField("number_of_failed_fetches");
|
private static final ParseField NUMBER_OF_FAILED_FETCHES_FIELD = new ParseField("number_of_failed_fetches");
|
||||||
private static final ParseField OPERATIONS_RECEIVED_FIELD = new ParseField("operations_received");
|
private static final ParseField OPERATIONS_RECEIVED_FIELD = new ParseField("operations_received");
|
||||||
|
@ -118,7 +118,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
|
||||||
STATUS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUMBER_OF_QUEUED_WRITES_FIELD);
|
STATUS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUMBER_OF_QUEUED_WRITES_FIELD);
|
||||||
STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), MAPPING_VERSION_FIELD);
|
STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), MAPPING_VERSION_FIELD);
|
||||||
STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOTAL_FETCH_TIME_MILLIS_FIELD);
|
STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOTAL_FETCH_TIME_MILLIS_FIELD);
|
||||||
STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOTAL_FETCH_LEADER_TIME_MILLIS_FIELD);
|
STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOTAL_FETCH_REMOTE_TIME_MILLIS_FIELD);
|
||||||
STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_SUCCESSFUL_FETCHES_FIELD);
|
STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_SUCCESSFUL_FETCHES_FIELD);
|
||||||
STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_FAILED_FETCHES_FIELD);
|
STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_FAILED_FETCHES_FIELD);
|
||||||
STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), OPERATIONS_RECEIVED_FIELD);
|
STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), OPERATIONS_RECEIVED_FIELD);
|
||||||
|
@ -147,10 +147,10 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
|
||||||
FETCH_EXCEPTIONS_ENTRY_EXCEPTION);
|
FETCH_EXCEPTIONS_ENTRY_EXCEPTION);
|
||||||
}
|
}
|
||||||
|
|
||||||
private final String leaderCluster;
|
private final String remoteCluster;
|
||||||
|
|
||||||
public String getLeaderCluster() {
|
public String getRemoteCluster() {
|
||||||
return leaderCluster;
|
return remoteCluster;
|
||||||
}
|
}
|
||||||
|
|
||||||
private final String leaderIndex;
|
private final String leaderIndex;
|
||||||
|
@ -231,10 +231,10 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
|
||||||
return totalFetchTimeMillis;
|
return totalFetchTimeMillis;
|
||||||
}
|
}
|
||||||
|
|
||||||
private final long totalFetchLeaderTimeMillis;
|
private final long totalFetchRemoteTimeMillis;
|
||||||
|
|
||||||
public long totalFetchLeaderTimeMillis() {
|
public long totalFetchRemoteTimeMillis() {
|
||||||
return totalFetchLeaderTimeMillis;
|
return totalFetchRemoteTimeMillis;
|
||||||
}
|
}
|
||||||
|
|
||||||
private final long numberOfSuccessfulFetches;
|
private final long numberOfSuccessfulFetches;
|
||||||
|
@ -304,7 +304,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
|
||||||
}
|
}
|
||||||
|
|
||||||
public ShardFollowNodeTaskStatus(
|
public ShardFollowNodeTaskStatus(
|
||||||
final String leaderCluster,
|
final String remoteCluster,
|
||||||
final String leaderIndex,
|
final String leaderIndex,
|
||||||
final String followerIndex,
|
final String followerIndex,
|
||||||
final int shardId,
|
final int shardId,
|
||||||
|
@ -318,7 +318,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
|
||||||
final int numberOfQueuedWrites,
|
final int numberOfQueuedWrites,
|
||||||
final long mappingVersion,
|
final long mappingVersion,
|
||||||
final long totalFetchTimeMillis,
|
final long totalFetchTimeMillis,
|
||||||
final long totalFetchLeaderTimeMillis,
|
final long totalFetchRemoteTimeMillis,
|
||||||
final long numberOfSuccessfulFetches,
|
final long numberOfSuccessfulFetches,
|
||||||
final long numberOfFailedFetches,
|
final long numberOfFailedFetches,
|
||||||
final long operationsReceived,
|
final long operationsReceived,
|
||||||
|
@ -330,7 +330,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
|
||||||
final NavigableMap<Long, Tuple<Integer, ElasticsearchException>> fetchExceptions,
|
final NavigableMap<Long, Tuple<Integer, ElasticsearchException>> fetchExceptions,
|
||||||
final long timeSinceLastFetchMillis,
|
final long timeSinceLastFetchMillis,
|
||||||
final ElasticsearchException fatalException) {
|
final ElasticsearchException fatalException) {
|
||||||
this.leaderCluster = leaderCluster;
|
this.remoteCluster = remoteCluster;
|
||||||
this.leaderIndex = leaderIndex;
|
this.leaderIndex = leaderIndex;
|
||||||
this.followerIndex = followerIndex;
|
this.followerIndex = followerIndex;
|
||||||
this.shardId = shardId;
|
this.shardId = shardId;
|
||||||
|
@ -344,7 +344,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
|
||||||
this.numberOfQueuedWrites = numberOfQueuedWrites;
|
this.numberOfQueuedWrites = numberOfQueuedWrites;
|
||||||
this.mappingVersion = mappingVersion;
|
this.mappingVersion = mappingVersion;
|
||||||
this.totalFetchTimeMillis = totalFetchTimeMillis;
|
this.totalFetchTimeMillis = totalFetchTimeMillis;
|
||||||
this.totalFetchLeaderTimeMillis = totalFetchLeaderTimeMillis;
|
this.totalFetchRemoteTimeMillis = totalFetchRemoteTimeMillis;
|
||||||
this.numberOfSuccessfulFetches = numberOfSuccessfulFetches;
|
this.numberOfSuccessfulFetches = numberOfSuccessfulFetches;
|
||||||
this.numberOfFailedFetches = numberOfFailedFetches;
|
this.numberOfFailedFetches = numberOfFailedFetches;
|
||||||
this.operationsReceived = operationsReceived;
|
this.operationsReceived = operationsReceived;
|
||||||
|
@ -359,7 +359,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
|
||||||
}
|
}
|
||||||
|
|
||||||
public ShardFollowNodeTaskStatus(final StreamInput in) throws IOException {
|
public ShardFollowNodeTaskStatus(final StreamInput in) throws IOException {
|
||||||
this.leaderCluster = in.readOptionalString();
|
this.remoteCluster = in.readOptionalString();
|
||||||
this.leaderIndex = in.readString();
|
this.leaderIndex = in.readString();
|
||||||
this.followerIndex = in.readString();
|
this.followerIndex = in.readString();
|
||||||
this.shardId = in.readVInt();
|
this.shardId = in.readVInt();
|
||||||
|
@ -373,7 +373,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
|
||||||
this.numberOfQueuedWrites = in.readVInt();
|
this.numberOfQueuedWrites = in.readVInt();
|
||||||
this.mappingVersion = in.readVLong();
|
this.mappingVersion = in.readVLong();
|
||||||
this.totalFetchTimeMillis = in.readVLong();
|
this.totalFetchTimeMillis = in.readVLong();
|
||||||
this.totalFetchLeaderTimeMillis = in.readVLong();
|
this.totalFetchRemoteTimeMillis = in.readVLong();
|
||||||
this.numberOfSuccessfulFetches = in.readVLong();
|
this.numberOfSuccessfulFetches = in.readVLong();
|
||||||
this.numberOfFailedFetches = in.readVLong();
|
this.numberOfFailedFetches = in.readVLong();
|
||||||
this.operationsReceived = in.readVLong();
|
this.operationsReceived = in.readVLong();
|
||||||
|
@ -395,7 +395,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void writeTo(final StreamOutput out) throws IOException {
|
public void writeTo(final StreamOutput out) throws IOException {
|
||||||
out.writeOptionalString(leaderCluster);
|
out.writeOptionalString(remoteCluster);
|
||||||
out.writeString(leaderIndex);
|
out.writeString(leaderIndex);
|
||||||
out.writeString(followerIndex);
|
out.writeString(followerIndex);
|
||||||
out.writeVInt(shardId);
|
out.writeVInt(shardId);
|
||||||
|
@ -409,7 +409,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
|
||||||
out.writeVInt(numberOfQueuedWrites);
|
out.writeVInt(numberOfQueuedWrites);
|
||||||
out.writeVLong(mappingVersion);
|
out.writeVLong(mappingVersion);
|
||||||
out.writeVLong(totalFetchTimeMillis);
|
out.writeVLong(totalFetchTimeMillis);
|
||||||
out.writeVLong(totalFetchLeaderTimeMillis);
|
out.writeVLong(totalFetchRemoteTimeMillis);
|
||||||
out.writeVLong(numberOfSuccessfulFetches);
|
out.writeVLong(numberOfSuccessfulFetches);
|
||||||
out.writeVLong(numberOfFailedFetches);
|
out.writeVLong(numberOfFailedFetches);
|
||||||
out.writeVLong(operationsReceived);
|
out.writeVLong(operationsReceived);
|
||||||
|
@ -440,7 +440,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
|
||||||
}
|
}
|
||||||
|
|
||||||
public XContentBuilder toXContentFragment(final XContentBuilder builder, final Params params) throws IOException {
|
public XContentBuilder toXContentFragment(final XContentBuilder builder, final Params params) throws IOException {
|
||||||
builder.field(LEADER_CLUSTER.getPreferredName(), leaderCluster);
|
builder.field(LEADER_CLUSTER.getPreferredName(), remoteCluster);
|
||||||
builder.field(LEADER_INDEX.getPreferredName(), leaderIndex);
|
builder.field(LEADER_INDEX.getPreferredName(), leaderIndex);
|
||||||
builder.field(FOLLOWER_INDEX.getPreferredName(), followerIndex);
|
builder.field(FOLLOWER_INDEX.getPreferredName(), followerIndex);
|
||||||
builder.field(SHARD_ID.getPreferredName(), shardId);
|
builder.field(SHARD_ID.getPreferredName(), shardId);
|
||||||
|
@ -458,9 +458,9 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
|
||||||
"total_fetch_time",
|
"total_fetch_time",
|
||||||
new TimeValue(totalFetchTimeMillis, TimeUnit.MILLISECONDS));
|
new TimeValue(totalFetchTimeMillis, TimeUnit.MILLISECONDS));
|
||||||
builder.humanReadableField(
|
builder.humanReadableField(
|
||||||
TOTAL_FETCH_LEADER_TIME_MILLIS_FIELD.getPreferredName(),
|
TOTAL_FETCH_REMOTE_TIME_MILLIS_FIELD.getPreferredName(),
|
||||||
"total_fetch_leader_time",
|
"total_fetch_leader_time",
|
||||||
new TimeValue(totalFetchLeaderTimeMillis, TimeUnit.MILLISECONDS));
|
new TimeValue(totalFetchRemoteTimeMillis, TimeUnit.MILLISECONDS));
|
||||||
builder.field(NUMBER_OF_SUCCESSFUL_FETCHES_FIELD.getPreferredName(), numberOfSuccessfulFetches);
|
builder.field(NUMBER_OF_SUCCESSFUL_FETCHES_FIELD.getPreferredName(), numberOfSuccessfulFetches);
|
||||||
builder.field(NUMBER_OF_FAILED_FETCHES_FIELD.getPreferredName(), numberOfFailedFetches);
|
builder.field(NUMBER_OF_FAILED_FETCHES_FIELD.getPreferredName(), numberOfFailedFetches);
|
||||||
builder.field(OPERATIONS_RECEIVED_FIELD.getPreferredName(), operationsReceived);
|
builder.field(OPERATIONS_RECEIVED_FIELD.getPreferredName(), operationsReceived);
|
||||||
|
@ -519,7 +519,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
|
||||||
final ShardFollowNodeTaskStatus that = (ShardFollowNodeTaskStatus) o;
|
final ShardFollowNodeTaskStatus that = (ShardFollowNodeTaskStatus) o;
|
||||||
String fatalExceptionMessage = fatalException != null ? fatalException.getMessage() : null;
|
String fatalExceptionMessage = fatalException != null ? fatalException.getMessage() : null;
|
||||||
String otherFatalExceptionMessage = that.fatalException != null ? that.fatalException.getMessage() : null;
|
String otherFatalExceptionMessage = that.fatalException != null ? that.fatalException.getMessage() : null;
|
||||||
return leaderCluster.equals(that.leaderCluster) &&
|
return remoteCluster.equals(that.remoteCluster) &&
|
||||||
leaderIndex.equals(that.leaderIndex) &&
|
leaderIndex.equals(that.leaderIndex) &&
|
||||||
followerIndex.equals(that.followerIndex) &&
|
followerIndex.equals(that.followerIndex) &&
|
||||||
shardId == that.shardId &&
|
shardId == that.shardId &&
|
||||||
|
@ -533,7 +533,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
|
||||||
numberOfQueuedWrites == that.numberOfQueuedWrites &&
|
numberOfQueuedWrites == that.numberOfQueuedWrites &&
|
||||||
mappingVersion == that.mappingVersion &&
|
mappingVersion == that.mappingVersion &&
|
||||||
totalFetchTimeMillis == that.totalFetchTimeMillis &&
|
totalFetchTimeMillis == that.totalFetchTimeMillis &&
|
||||||
totalFetchLeaderTimeMillis == that.totalFetchLeaderTimeMillis &&
|
totalFetchRemoteTimeMillis == that.totalFetchRemoteTimeMillis &&
|
||||||
numberOfSuccessfulFetches == that.numberOfSuccessfulFetches &&
|
numberOfSuccessfulFetches == that.numberOfSuccessfulFetches &&
|
||||||
numberOfFailedFetches == that.numberOfFailedFetches &&
|
numberOfFailedFetches == that.numberOfFailedFetches &&
|
||||||
operationsReceived == that.operationsReceived &&
|
operationsReceived == that.operationsReceived &&
|
||||||
|
@ -556,7 +556,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
|
||||||
public int hashCode() {
|
public int hashCode() {
|
||||||
String fatalExceptionMessage = fatalException != null ? fatalException.getMessage() : null;
|
String fatalExceptionMessage = fatalException != null ? fatalException.getMessage() : null;
|
||||||
return Objects.hash(
|
return Objects.hash(
|
||||||
leaderCluster,
|
remoteCluster,
|
||||||
leaderIndex,
|
leaderIndex,
|
||||||
followerIndex,
|
followerIndex,
|
||||||
shardId,
|
shardId,
|
||||||
|
@ -570,7 +570,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
|
||||||
numberOfQueuedWrites,
|
numberOfQueuedWrites,
|
||||||
mappingVersion,
|
mappingVersion,
|
||||||
totalFetchTimeMillis,
|
totalFetchTimeMillis,
|
||||||
totalFetchLeaderTimeMillis,
|
totalFetchRemoteTimeMillis,
|
||||||
numberOfSuccessfulFetches,
|
numberOfSuccessfulFetches,
|
||||||
numberOfFailedFetches,
|
numberOfFailedFetches,
|
||||||
operationsReceived,
|
operationsReceived,
|
||||||
|
|
|
@ -25,6 +25,7 @@ import java.util.List;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
|
|
||||||
import static org.elasticsearch.action.ValidateActions.addValidationError;
|
import static org.elasticsearch.action.ValidateActions.addValidationError;
|
||||||
|
import static org.elasticsearch.xpack.core.ccr.AutoFollowMetadata.AutoFollowPattern.REMOTE_CLUSTER_FIELD;
|
||||||
|
|
||||||
public class PutAutoFollowPatternAction extends Action<AcknowledgedResponse> {
|
public class PutAutoFollowPatternAction extends Action<AcknowledgedResponse> {
|
||||||
|
|
||||||
|
@ -48,7 +49,7 @@ public class PutAutoFollowPatternAction extends Action<AcknowledgedResponse> {
|
||||||
|
|
||||||
static {
|
static {
|
||||||
PARSER.declareString(Request::setName, NAME_FIELD);
|
PARSER.declareString(Request::setName, NAME_FIELD);
|
||||||
PARSER.declareString(Request::setLeaderCluster, AutoFollowPattern.LEADER_CLUSTER_FIELD);
|
PARSER.declareString(Request::setRemoteCluster, REMOTE_CLUSTER_FIELD);
|
||||||
PARSER.declareStringArray(Request::setLeaderIndexPatterns, AutoFollowPattern.LEADER_PATTERNS_FIELD);
|
PARSER.declareStringArray(Request::setLeaderIndexPatterns, AutoFollowPattern.LEADER_PATTERNS_FIELD);
|
||||||
PARSER.declareString(Request::setFollowIndexNamePattern, AutoFollowPattern.FOLLOW_PATTERN_FIELD);
|
PARSER.declareString(Request::setFollowIndexNamePattern, AutoFollowPattern.FOLLOW_PATTERN_FIELD);
|
||||||
PARSER.declareInt(Request::setMaxBatchOperationCount, AutoFollowPattern.MAX_BATCH_OPERATION_COUNT);
|
PARSER.declareInt(Request::setMaxBatchOperationCount, AutoFollowPattern.MAX_BATCH_OPERATION_COUNT);
|
||||||
|
@ -83,7 +84,7 @@ public class PutAutoFollowPatternAction extends Action<AcknowledgedResponse> {
|
||||||
}
|
}
|
||||||
|
|
||||||
private String name;
|
private String name;
|
||||||
private String leaderCluster;
|
private String remoteCluster;
|
||||||
private List<String> leaderIndexPatterns;
|
private List<String> leaderIndexPatterns;
|
||||||
private String followIndexNamePattern;
|
private String followIndexNamePattern;
|
||||||
|
|
||||||
|
@ -101,8 +102,8 @@ public class PutAutoFollowPatternAction extends Action<AcknowledgedResponse> {
|
||||||
if (name == null) {
|
if (name == null) {
|
||||||
validationException = addValidationError("[" + NAME_FIELD.getPreferredName() + "] is missing", validationException);
|
validationException = addValidationError("[" + NAME_FIELD.getPreferredName() + "] is missing", validationException);
|
||||||
}
|
}
|
||||||
if (leaderCluster == null) {
|
if (remoteCluster == null) {
|
||||||
validationException = addValidationError("[" + AutoFollowPattern.LEADER_CLUSTER_FIELD.getPreferredName() +
|
validationException = addValidationError("[" + REMOTE_CLUSTER_FIELD.getPreferredName() +
|
||||||
"] is missing", validationException);
|
"] is missing", validationException);
|
||||||
}
|
}
|
||||||
if (leaderIndexPatterns == null || leaderIndexPatterns.isEmpty()) {
|
if (leaderIndexPatterns == null || leaderIndexPatterns.isEmpty()) {
|
||||||
|
@ -133,12 +134,12 @@ public class PutAutoFollowPatternAction extends Action<AcknowledgedResponse> {
|
||||||
this.name = name;
|
this.name = name;
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getLeaderCluster() {
|
public String getRemoteCluster() {
|
||||||
return leaderCluster;
|
return remoteCluster;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setLeaderCluster(String leaderCluster) {
|
public void setRemoteCluster(String remoteCluster) {
|
||||||
this.leaderCluster = leaderCluster;
|
this.remoteCluster = remoteCluster;
|
||||||
}
|
}
|
||||||
|
|
||||||
public List<String> getLeaderIndexPatterns() {
|
public List<String> getLeaderIndexPatterns() {
|
||||||
|
@ -217,7 +218,7 @@ public class PutAutoFollowPatternAction extends Action<AcknowledgedResponse> {
|
||||||
public void readFrom(StreamInput in) throws IOException {
|
public void readFrom(StreamInput in) throws IOException {
|
||||||
super.readFrom(in);
|
super.readFrom(in);
|
||||||
name = in.readString();
|
name = in.readString();
|
||||||
leaderCluster = in.readString();
|
remoteCluster = in.readString();
|
||||||
leaderIndexPatterns = in.readList(StreamInput::readString);
|
leaderIndexPatterns = in.readList(StreamInput::readString);
|
||||||
followIndexNamePattern = in.readOptionalString();
|
followIndexNamePattern = in.readOptionalString();
|
||||||
maxBatchOperationCount = in.readOptionalVInt();
|
maxBatchOperationCount = in.readOptionalVInt();
|
||||||
|
@ -233,7 +234,7 @@ public class PutAutoFollowPatternAction extends Action<AcknowledgedResponse> {
|
||||||
public void writeTo(StreamOutput out) throws IOException {
|
public void writeTo(StreamOutput out) throws IOException {
|
||||||
super.writeTo(out);
|
super.writeTo(out);
|
||||||
out.writeString(name);
|
out.writeString(name);
|
||||||
out.writeString(leaderCluster);
|
out.writeString(remoteCluster);
|
||||||
out.writeStringList(leaderIndexPatterns);
|
out.writeStringList(leaderIndexPatterns);
|
||||||
out.writeOptionalString(followIndexNamePattern);
|
out.writeOptionalString(followIndexNamePattern);
|
||||||
out.writeOptionalVInt(maxBatchOperationCount);
|
out.writeOptionalVInt(maxBatchOperationCount);
|
||||||
|
@ -250,7 +251,7 @@ public class PutAutoFollowPatternAction extends Action<AcknowledgedResponse> {
|
||||||
builder.startObject();
|
builder.startObject();
|
||||||
{
|
{
|
||||||
builder.field(NAME_FIELD.getPreferredName(), name);
|
builder.field(NAME_FIELD.getPreferredName(), name);
|
||||||
builder.field(AutoFollowPattern.LEADER_CLUSTER_FIELD.getPreferredName(), leaderCluster);
|
builder.field(REMOTE_CLUSTER_FIELD.getPreferredName(), remoteCluster);
|
||||||
builder.field(AutoFollowPattern.LEADER_PATTERNS_FIELD.getPreferredName(), leaderIndexPatterns);
|
builder.field(AutoFollowPattern.LEADER_PATTERNS_FIELD.getPreferredName(), leaderIndexPatterns);
|
||||||
if (followIndexNamePattern != null) {
|
if (followIndexNamePattern != null) {
|
||||||
builder.field(AutoFollowPattern.FOLLOW_PATTERN_FIELD.getPreferredName(), followIndexNamePattern);
|
builder.field(AutoFollowPattern.FOLLOW_PATTERN_FIELD.getPreferredName(), followIndexNamePattern);
|
||||||
|
@ -287,7 +288,7 @@ public class PutAutoFollowPatternAction extends Action<AcknowledgedResponse> {
|
||||||
if (o == null || getClass() != o.getClass()) return false;
|
if (o == null || getClass() != o.getClass()) return false;
|
||||||
Request request = (Request) o;
|
Request request = (Request) o;
|
||||||
return Objects.equals(name, request.name) &&
|
return Objects.equals(name, request.name) &&
|
||||||
Objects.equals(leaderCluster, request.leaderCluster) &&
|
Objects.equals(remoteCluster, request.remoteCluster) &&
|
||||||
Objects.equals(leaderIndexPatterns, request.leaderIndexPatterns) &&
|
Objects.equals(leaderIndexPatterns, request.leaderIndexPatterns) &&
|
||||||
Objects.equals(followIndexNamePattern, request.followIndexNamePattern) &&
|
Objects.equals(followIndexNamePattern, request.followIndexNamePattern) &&
|
||||||
Objects.equals(maxBatchOperationCount, request.maxBatchOperationCount) &&
|
Objects.equals(maxBatchOperationCount, request.maxBatchOperationCount) &&
|
||||||
|
@ -303,7 +304,7 @@ public class PutAutoFollowPatternAction extends Action<AcknowledgedResponse> {
|
||||||
public int hashCode() {
|
public int hashCode() {
|
||||||
return Objects.hash(
|
return Objects.hash(
|
||||||
name,
|
name,
|
||||||
leaderCluster,
|
remoteCluster,
|
||||||
leaderIndexPatterns,
|
leaderIndexPatterns,
|
||||||
followIndexNamePattern,
|
followIndexNamePattern,
|
||||||
maxBatchOperationCount,
|
maxBatchOperationCount,
|
||||||
|
|
|
@ -51,7 +51,7 @@ public final class PutFollowAction extends Action<PutFollowAction.Response> {
|
||||||
|
|
||||||
public static class Request extends AcknowledgedRequest<Request> implements IndicesRequest, ToXContentObject {
|
public static class Request extends AcknowledgedRequest<Request> implements IndicesRequest, ToXContentObject {
|
||||||
|
|
||||||
private static final ParseField LEADER_CLUSTER_FIELD = new ParseField("leader_cluster");
|
private static final ParseField REMOTE_CLUSTER_FIELD = new ParseField("remote_cluster");
|
||||||
private static final ParseField LEADER_INDEX_FIELD = new ParseField("leader_index");
|
private static final ParseField LEADER_INDEX_FIELD = new ParseField("leader_index");
|
||||||
|
|
||||||
private static final ObjectParser<Request, String> PARSER = new ObjectParser<>(NAME, () -> {
|
private static final ObjectParser<Request, String> PARSER = new ObjectParser<>(NAME, () -> {
|
||||||
|
@ -61,7 +61,7 @@ public final class PutFollowAction extends Action<PutFollowAction.Response> {
|
||||||
});
|
});
|
||||||
|
|
||||||
static {
|
static {
|
||||||
PARSER.declareString(Request::setLeaderCluster, LEADER_CLUSTER_FIELD);
|
PARSER.declareString(Request::setRemoteCluster, REMOTE_CLUSTER_FIELD);
|
||||||
PARSER.declareString(Request::setLeaderIndex, LEADER_INDEX_FIELD);
|
PARSER.declareString(Request::setLeaderIndex, LEADER_INDEX_FIELD);
|
||||||
PARSER.declareString((request, value) -> request.followRequest.setFollowerIndex(value), FOLLOWER_INDEX_FIELD);
|
PARSER.declareString((request, value) -> request.followRequest.setFollowerIndex(value), FOLLOWER_INDEX_FIELD);
|
||||||
PARSER.declareInt((request, value) -> request.followRequest.setMaxBatchOperationCount(value), MAX_BATCH_OPERATION_COUNT);
|
PARSER.declareInt((request, value) -> request.followRequest.setMaxBatchOperationCount(value), MAX_BATCH_OPERATION_COUNT);
|
||||||
|
@ -99,19 +99,19 @@ public final class PutFollowAction extends Action<PutFollowAction.Response> {
|
||||||
return request;
|
return request;
|
||||||
}
|
}
|
||||||
|
|
||||||
private String leaderCluster;
|
private String remoteCluster;
|
||||||
private String leaderIndex;
|
private String leaderIndex;
|
||||||
private ResumeFollowAction.Request followRequest;
|
private ResumeFollowAction.Request followRequest;
|
||||||
|
|
||||||
public Request() {
|
public Request() {
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getLeaderCluster() {
|
public String getRemoteCluster() {
|
||||||
return leaderCluster;
|
return remoteCluster;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setLeaderCluster(String leaderCluster) {
|
public void setRemoteCluster(String remoteCluster) {
|
||||||
this.leaderCluster = leaderCluster;
|
this.remoteCluster = remoteCluster;
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getLeaderIndex() {
|
public String getLeaderIndex() {
|
||||||
|
@ -133,8 +133,8 @@ public final class PutFollowAction extends Action<PutFollowAction.Response> {
|
||||||
@Override
|
@Override
|
||||||
public ActionRequestValidationException validate() {
|
public ActionRequestValidationException validate() {
|
||||||
ActionRequestValidationException e = followRequest.validate();
|
ActionRequestValidationException e = followRequest.validate();
|
||||||
if (leaderCluster == null) {
|
if (remoteCluster == null) {
|
||||||
e = addValidationError(LEADER_CLUSTER_FIELD.getPreferredName() + " is missing", e);
|
e = addValidationError(REMOTE_CLUSTER_FIELD.getPreferredName() + " is missing", e);
|
||||||
}
|
}
|
||||||
if (leaderIndex == null) {
|
if (leaderIndex == null) {
|
||||||
e = addValidationError(LEADER_INDEX_FIELD.getPreferredName() + " is missing", e);
|
e = addValidationError(LEADER_INDEX_FIELD.getPreferredName() + " is missing", e);
|
||||||
|
@ -155,7 +155,7 @@ public final class PutFollowAction extends Action<PutFollowAction.Response> {
|
||||||
@Override
|
@Override
|
||||||
public void readFrom(StreamInput in) throws IOException {
|
public void readFrom(StreamInput in) throws IOException {
|
||||||
super.readFrom(in);
|
super.readFrom(in);
|
||||||
leaderCluster = in.readString();
|
remoteCluster = in.readString();
|
||||||
leaderIndex = in.readString();
|
leaderIndex = in.readString();
|
||||||
followRequest = new ResumeFollowAction.Request();
|
followRequest = new ResumeFollowAction.Request();
|
||||||
followRequest.readFrom(in);
|
followRequest.readFrom(in);
|
||||||
|
@ -164,7 +164,7 @@ public final class PutFollowAction extends Action<PutFollowAction.Response> {
|
||||||
@Override
|
@Override
|
||||||
public void writeTo(StreamOutput out) throws IOException {
|
public void writeTo(StreamOutput out) throws IOException {
|
||||||
super.writeTo(out);
|
super.writeTo(out);
|
||||||
out.writeString(leaderCluster);
|
out.writeString(remoteCluster);
|
||||||
out.writeString(leaderIndex);
|
out.writeString(leaderIndex);
|
||||||
followRequest.writeTo(out);
|
followRequest.writeTo(out);
|
||||||
}
|
}
|
||||||
|
@ -173,7 +173,7 @@ public final class PutFollowAction extends Action<PutFollowAction.Response> {
|
||||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||||
builder.startObject();
|
builder.startObject();
|
||||||
{
|
{
|
||||||
builder.field(LEADER_CLUSTER_FIELD.getPreferredName(), leaderCluster);
|
builder.field(REMOTE_CLUSTER_FIELD.getPreferredName(), remoteCluster);
|
||||||
builder.field(LEADER_INDEX_FIELD.getPreferredName(), leaderIndex);
|
builder.field(LEADER_INDEX_FIELD.getPreferredName(), leaderIndex);
|
||||||
followRequest.toXContentFragment(builder, params);
|
followRequest.toXContentFragment(builder, params);
|
||||||
}
|
}
|
||||||
|
@ -186,14 +186,14 @@ public final class PutFollowAction extends Action<PutFollowAction.Response> {
|
||||||
if (this == o) return true;
|
if (this == o) return true;
|
||||||
if (o == null || getClass() != o.getClass()) return false;
|
if (o == null || getClass() != o.getClass()) return false;
|
||||||
Request request = (Request) o;
|
Request request = (Request) o;
|
||||||
return Objects.equals(leaderCluster, request.leaderCluster) &&
|
return Objects.equals(remoteCluster, request.remoteCluster) &&
|
||||||
Objects.equals(leaderIndex, request.leaderIndex) &&
|
Objects.equals(leaderIndex, request.leaderIndex) &&
|
||||||
Objects.equals(followRequest, request.followRequest);
|
Objects.equals(followRequest, request.followRequest);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int hashCode() {
|
public int hashCode() {
|
||||||
return Objects.hash(leaderCluster, leaderIndex, followRequest);
|
return Objects.hash(remoteCluster, leaderIndex, followRequest);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -929,7 +929,7 @@
|
||||||
},
|
},
|
||||||
"ccr_stats": {
|
"ccr_stats": {
|
||||||
"properties": {
|
"properties": {
|
||||||
"leader_cluster": {
|
"remote_cluster": {
|
||||||
"type": "keyword"
|
"type": "keyword"
|
||||||
},
|
},
|
||||||
"leader_index": {
|
"leader_index": {
|
||||||
|
@ -971,7 +971,7 @@
|
||||||
"total_fetch_time_millis": {
|
"total_fetch_time_millis": {
|
||||||
"type": "long"
|
"type": "long"
|
||||||
},
|
},
|
||||||
"total_fetch_leader_time_millis": {
|
"total_fetch_remote_time_millis": {
|
||||||
"type": "long"
|
"type": "long"
|
||||||
},
|
},
|
||||||
"number_of_successful_fetches": {
|
"number_of_successful_fetches": {
|
||||||
|
|
Loading…
Reference in New Issue