Allow follower indices to override leader settings (#58103)

Today when creating a follower index via the put follow API, or via an
auto-follow pattern, it is not possible to specify settings overrides
for the follower index. Instead, we copy all of the leader index
settings to the follower. Yet, there are cases where a user would want
some different settings on the follower index such as the number of
replicas, or allocation settings. This commit addresses this by allowing
the user to specify settings overrides when creating follower index via
manual put follower calls, or via auto-follow patterns. Note that not
all settings can be overrode (e.g., index.number_of_shards) so we also
have detection that prevents attempting to override settings that must
be equal between the leader and follow index. Note that we do not even
allow specifying such settings in the overrides, even if they are
specified to be equal between the leader and the follower
index. Instead, the must be implicitly copied from the leader index, not
explicitly set by the user.
This commit is contained in:
Jason Tedor 2020-06-18 10:55:17 -04:00
parent 9ba1b1d067
commit be08268562
No known key found for this signature in database
GPG Key ID: FA89F05560F16BC5
32 changed files with 1143 additions and 131 deletions

View File

@ -20,6 +20,7 @@
package org.elasticsearch.client.ccr;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ObjectParser;
@ -32,6 +33,7 @@ import java.util.Objects;
public class FollowConfig {
static final ParseField SETTINGS = new ParseField("settings");
static final ParseField MAX_READ_REQUEST_OPERATION_COUNT = new ParseField("max_read_request_operation_count");
static final ParseField MAX_READ_REQUEST_SIZE = new ParseField("max_read_request_size");
static final ParseField MAX_OUTSTANDING_READ_REQUESTS = new ParseField("max_outstanding_read_requests");
@ -49,6 +51,7 @@ public class FollowConfig {
FollowConfig::new);
static {
PARSER.declareObject(FollowConfig::setSettings, (p, c) -> Settings.fromXContent(p), SETTINGS);
PARSER.declareInt(FollowConfig::setMaxReadRequestOperationCount, MAX_READ_REQUEST_OPERATION_COUNT);
PARSER.declareInt(FollowConfig::setMaxOutstandingReadRequests, MAX_OUTSTANDING_READ_REQUESTS);
PARSER.declareField(
@ -81,6 +84,7 @@ public class FollowConfig {
return PARSER.apply(parser, null);
}
private Settings settings = Settings.EMPTY;
private Integer maxReadRequestOperationCount;
private Integer maxOutstandingReadRequests;
private ByteSizeValue maxReadRequestSize;
@ -95,6 +99,14 @@ public class FollowConfig {
FollowConfig() {
}
public Settings getSettings() {
return settings;
}
public void setSettings(final Settings settings) {
this.settings = Objects.requireNonNull(settings);
}
public Integer getMaxReadRequestOperationCount() {
return maxReadRequestOperationCount;
}
@ -176,6 +188,13 @@ public class FollowConfig {
}
void toXContentFragment(XContentBuilder builder, ToXContent.Params params) throws IOException {
if (settings.isEmpty() == false) {
builder.startObject(SETTINGS.getPreferredName());
{
settings.toXContent(builder, params);
}
builder.endObject();
}
if (maxReadRequestOperationCount != null) {
builder.field(MAX_READ_REQUEST_OPERATION_COUNT.getPreferredName(), maxReadRequestOperationCount);
}

View File

@ -20,6 +20,7 @@
package org.elasticsearch.client.ccr;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
@ -98,6 +99,7 @@ public final class GetAutoFollowPatternResponse {
PARSER.declareString(ConstructingObjectParser.constructorArg(), PutFollowRequest.REMOTE_CLUSTER_FIELD);
PARSER.declareStringArray(ConstructingObjectParser.constructorArg(), PutAutoFollowPatternRequest.LEADER_PATTERNS_FIELD);
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), PutAutoFollowPatternRequest.FOLLOW_PATTERN_FIELD);
PARSER.declareObject(Pattern::setSettings, (p, c) -> Settings.fromXContent(p), PutAutoFollowPatternRequest.SETTINGS);
PARSER.declareInt(Pattern::setMaxReadRequestOperationCount, FollowConfig.MAX_READ_REQUEST_OPERATION_COUNT);
PARSER.declareField(
Pattern::setMaxReadRequestSize,

View File

@ -20,6 +20,8 @@
package org.elasticsearch.client;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
@ -48,6 +50,8 @@ import org.elasticsearch.client.core.BroadcastResponse;
import org.elasticsearch.client.indices.CloseIndexRequest;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.seqno.ReplicationTracker;
import org.elasticsearch.test.rest.yaml.ObjectPath;
@ -61,6 +65,7 @@ import java.util.Map;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
@ -80,6 +85,7 @@ public class CCRIT extends ESRestHighLevelClientTestCase {
assertThat(response.isAcknowledged(), is(true));
PutFollowRequest putFollowRequest = new PutFollowRequest("local_cluster", "leader", "follower", ActiveShardCount.ONE);
putFollowRequest.setSettings(Settings.builder().put("index.number_of_replicas", 0L).build());
PutFollowResponse putFollowResponse = execute(putFollowRequest, ccrClient::putFollow, ccrClient::putFollowAsync);
assertThat(putFollowResponse.isFollowIndexCreated(), is(true));
assertThat(putFollowResponse.isFollowIndexShardsAcked(), is(true));
@ -118,6 +124,13 @@ public class CCRIT extends ESRestHighLevelClientTestCase {
SearchRequest followerSearchRequest = new SearchRequest("follower");
SearchResponse followerSearchResponse = highLevelClient().search(followerSearchRequest, RequestOptions.DEFAULT);
assertThat(followerSearchResponse.getHits().getTotalHits().value, equalTo(1L));
GetSettingsRequest followerSettingsRequest = new GetSettingsRequest().indices("follower");
GetSettingsResponse followerSettingsResponse =
highLevelClient().indices().getSettings(followerSettingsRequest, RequestOptions.DEFAULT);
assertThat(
IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.get(followerSettingsResponse.getIndexToSettings().get("follower")),
equalTo(0));
});
} catch (Exception e) {
IndicesFollowStats followStats = ccrClient.getCcrStats(new CcrStatsRequest(), RequestOptions.DEFAULT).getIndicesFollowStats();
@ -245,6 +258,10 @@ public class CCRIT extends ESRestHighLevelClientTestCase {
PutAutoFollowPatternRequest putAutoFollowPatternRequest =
new PutAutoFollowPatternRequest("pattern1", "local_cluster", Collections.singletonList("logs-*"));
putAutoFollowPatternRequest.setFollowIndexNamePattern("copy-{{leader_index}}");
final int followerNumberOfReplicas = randomIntBetween(0, 4);
final Settings autoFollowerPatternSettings =
Settings.builder().put("index.number_of_replicas", followerNumberOfReplicas).build();
putAutoFollowPatternRequest.setSettings(autoFollowerPatternSettings);
AcknowledgedResponse putAutoFollowPatternResponse =
execute(putAutoFollowPatternRequest, ccrClient::putAutoFollowPattern, ccrClient::putAutoFollowPatternAsync);
assertThat(putAutoFollowPatternResponse.isAcknowledged(), is(true));
@ -260,6 +277,9 @@ public class CCRIT extends ESRestHighLevelClientTestCase {
assertThat(ccrStatsResponse.getIndicesFollowStats().getShardFollowStats("copy-logs-20200101"), notNullValue());
});
assertThat(indexExists("copy-logs-20200101"), is(true));
assertThat(
getIndexSettingsAsMap("copy-logs-20200101"),
hasEntry("index.number_of_replicas", Integer.toString(followerNumberOfReplicas)));
GetAutoFollowPatternRequest getAutoFollowPatternRequest =
randomBoolean() ? new GetAutoFollowPatternRequest("pattern1") : new GetAutoFollowPatternRequest();
@ -271,6 +291,7 @@ public class CCRIT extends ESRestHighLevelClientTestCase {
assertThat(pattern.getRemoteCluster(), equalTo(putAutoFollowPatternRequest.getRemoteCluster()));
assertThat(pattern.getLeaderIndexPatterns(), equalTo(putAutoFollowPatternRequest.getLeaderIndexPatterns()));
assertThat(pattern.getFollowIndexNamePattern(), equalTo(putAutoFollowPatternRequest.getFollowIndexNamePattern()));
assertThat(pattern.getSettings(), equalTo(autoFollowerPatternSettings));
// Cleanup:
final DeleteAutoFollowPatternRequest deleteAutoFollowPatternRequest = new DeleteAutoFollowPatternRequest("pattern1");

View File

@ -20,6 +20,8 @@
package org.elasticsearch.client.ccr;
import org.elasticsearch.client.AbstractResponseTestCase;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentParser;
@ -47,8 +49,10 @@ public class GetAutoFollowPatternResponseTests extends AbstractResponseTestCase<
NavigableMap<String, AutoFollowMetadata.AutoFollowPattern> patterns = new TreeMap<>();
for (int i = 0; i < numPatterns; i++) {
String remoteCluster = randomAlphaOfLength(4);
List<String> leaderIndexPatters = Collections.singletonList(randomAlphaOfLength(4));
List<String> leaderIndexPatterns = Collections.singletonList(randomAlphaOfLength(4));
String followIndexNamePattern = randomAlphaOfLength(4);
final Settings settings =
Settings.builder().put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), randomIntBetween(0, 4)).build();
boolean active = randomBoolean();
Integer maxOutstandingReadRequests = null;
@ -91,10 +95,26 @@ public class GetAutoFollowPatternResponseTests extends AbstractResponseTestCase<
if (randomBoolean()) {
readPollTimeout = new TimeValue(randomNonNegativeLong());
}
patterns.put(randomAlphaOfLength(4), new AutoFollowMetadata.AutoFollowPattern(remoteCluster, leaderIndexPatters,
followIndexNamePattern, active, maxReadRequestOperationCount, maxWriteRequestOperationCount, maxOutstandingReadRequests,
maxOutstandingWriteRequests, maxReadRequestSize, maxWriteRequestSize, maxWriteBufferCount, maxWriteBufferSize,
maxRetryDelay, readPollTimeout));
patterns.put(
randomAlphaOfLength(4),
new AutoFollowMetadata.AutoFollowPattern(
remoteCluster,
leaderIndexPatterns,
followIndexNamePattern,
settings,
active,
maxReadRequestOperationCount,
maxWriteRequestOperationCount,
maxOutstandingReadRequests,
maxOutstandingWriteRequests,
maxReadRequestSize,
maxWriteRequestSize,
maxWriteBufferCount,
maxWriteBufferSize,
maxRetryDelay,
readPollTimeout
)
);
}
return new GetAutoFollowPatternAction.Response(patterns);
}
@ -115,6 +135,7 @@ public class GetAutoFollowPatternResponseTests extends AbstractResponseTestCase<
assertThat(serverPattern.getRemoteCluster(), equalTo(clientPattern.getRemoteCluster()));
assertThat(serverPattern.getLeaderIndexPatterns(), equalTo(clientPattern.getLeaderIndexPatterns()));
assertThat(serverPattern.getFollowIndexPattern(), equalTo(clientPattern.getFollowIndexNamePattern()));
assertThat(serverPattern.getSettings(), equalTo(clientPattern.getSettings()));
assertThat(serverPattern.getMaxOutstandingReadRequests(), equalTo(clientPattern.getMaxOutstandingReadRequests()));
assertThat(serverPattern.getMaxOutstandingWriteRequests(), equalTo(clientPattern.getMaxOutstandingWriteRequests()));
assertThat(serverPattern.getMaxReadRequestOperationCount(), equalTo(clientPattern.getMaxReadRequestOperationCount()));

View File

@ -54,6 +54,7 @@ import org.elasticsearch.client.core.BroadcastResponse;
import org.elasticsearch.client.indices.CloseIndexRequest;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.rest.yaml.ObjectPath;
import org.junit.Before;
@ -91,6 +92,9 @@ public class CCRDocumentationIT extends ESRestHighLevelClientTestCase {
"follower", // <3>
ActiveShardCount.ONE // <4>
);
Settings settings =
Settings.builder().put("index.number_of_replicas", 0L).build();
putFollowRequest.setSettings(settings); // <5>
// end::ccr-put-follow-request
// tag::ccr-put-follow-execute
@ -484,6 +488,9 @@ public class CCRDocumentationIT extends ESRestHighLevelClientTestCase {
Arrays.asList("logs-*", "metrics-*") // <3>
);
request.setFollowIndexNamePattern("copy-{{leader_index}}"); // <4>
Settings settings =
Settings.builder().put("index.number_of_replicas", 0L).build();
request.setSettings(settings); // <5>
// end::ccr-put-auto-follow-pattern-request
// tag::ccr-put-auto-follow-pattern-execute

View File

@ -22,6 +22,7 @@ include-tagged::{doc-tests-file}[{api}-request]
<2> The name of the remote cluster.
<3> The leader index patterns.
<4> The pattern used to create the follower index
<5> The settings overrides for the follower index
[id="{upid}-{api}-response"]
==== Response

View File

@ -22,6 +22,7 @@ include-tagged::{doc-tests-file}[{api}-request]
<3> The name of the follower index that gets created as part of the put follow API call.
<4> The number of active shard copies to wait for before the put follow API returns a
response, as an `ActiveShardCount`
<5> The settings overrides for the follower index.
[id="{upid}-{api}-response"]
==== Response

View File

@ -94,6 +94,9 @@ PUT /_ccr/auto_follow/my_auto_follow_pattern
"leader_index*"
],
"follow_index_pattern" : "{{leader_index}}-follower",
"settings": {
"index.number_of_replicas": 0
},
"max_read_request_operation_count" : 1024,
"max_outstanding_read_requests" : 16,
"max_read_request_size" : "1024k",

View File

@ -1,4 +1,8 @@
[testenv="platinum"]
`settings`::
(object) Settings to override from the leader index. Note that certain
settings can not be overrode (e.g., `index.number_of_shards`).
`max_read_request_operation_count`::
(integer) The maximum number of operations to pull per read from the remote
cluster.

View File

@ -90,6 +90,9 @@ PUT /follower_index/_ccr/follow?wait_for_active_shards=1
{
"remote_cluster" : "remote_cluster",
"leader_index" : "leader_index",
"settings": {
"index.number_of_replicas": 0
},
"max_read_request_operation_count" : 1024,
"max_outstanding_read_requests" : 16,
"max_read_request_size" : "1024k",

View File

@ -7,13 +7,21 @@
package org.elasticsearch.xpack.ccr;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.instanceOf;
public class AutoFollowIT extends ESCCRRestTestCase {
@ -66,7 +74,27 @@ public class AutoFollowIT extends ESCCRRestTestCase {
int initialNumberOfSuccessfulFollowedIndices = getNumberOfSuccessfulFollowedIndices();
Request request = new Request("PUT", "/_ccr/auto_follow/test_pattern");
request.setJsonEntity("{\"leader_index_patterns\": [\"metrics-*\"], \"remote_cluster\": \"leader_cluster\"}");
final boolean overrideNumberOfReplicas = randomBoolean();
try (XContentBuilder bodyBuilder = JsonXContent.contentBuilder()) {
bodyBuilder.startObject();
{
bodyBuilder.startArray("leader_index_patterns");
{
bodyBuilder.value("metrics-*");
}
bodyBuilder.endArray();
bodyBuilder.field("remote_cluster", "leader_cluster");
if (overrideNumberOfReplicas) {
bodyBuilder.startObject("settings");
{
bodyBuilder.field("index.number_of_replicas", 0);
}
bodyBuilder.endObject();
}
}
bodyBuilder.endObject();
request.setJsonEntity(Strings.toString(bodyBuilder));
}
assertOK(client().performRequest(request));
try (RestClient leaderClient = buildLeaderClient()) {
@ -84,6 +112,11 @@ public class AutoFollowIT extends ESCCRRestTestCase {
assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 1));
ensureYellow("metrics-20210101");
verifyDocuments("metrics-20210101", 5, "filtered_field:true");
if (overrideNumberOfReplicas) {
assertThat(getIndexSettingsAsMap("metrics-20210101"), hasEntry("index.number_of_replicas", "0"));
} else {
assertThat(getIndexSettingsAsMap("metrics-20210101"), hasEntry("index.number_of_replicas", "1"));
}
});
assertBusy(() -> {
verifyCcrMonitoring("metrics-20210101", "metrics-20210101");
@ -91,6 +124,45 @@ public class AutoFollowIT extends ESCCRRestTestCase {
}, 30, TimeUnit.SECONDS);
}
public void testPutAutoFollowPatternThatOverridesRequiredLeaderSetting() throws IOException {
if ("follow".equals(targetCluster) == false) {
logger.info("skipping test, waiting for target cluster [follow]" );
return;
}
final Request request = new Request("PUT", "/_ccr/auto_follow/test_pattern");
try (XContentBuilder bodyBuilder = JsonXContent.contentBuilder()) {
bodyBuilder.startObject();
{
bodyBuilder.startArray("leader_index_patterns");
{
bodyBuilder.value("metrics-*");
}
bodyBuilder.endArray();
bodyBuilder.field("remote_cluster", "leader_cluster");
bodyBuilder.startObject("settings");
{
bodyBuilder.field("index.number_of_shards", 5);
}
bodyBuilder.endObject();
}
bodyBuilder.endObject();
request.setJsonEntity(Strings.toString(bodyBuilder));
}
final ResponseException responseException = expectThrows(ResponseException.class, () -> client().performRequest(request));
final Response response = responseException.getResponse();
assertThat(response.getStatusLine().getStatusCode(), equalTo(400));
final Map<String, Object> responseAsMap = entityAsMap(response);
assertThat(responseAsMap, hasKey("error"));
assertThat(responseAsMap.get("error"), instanceOf(Map.class));
@SuppressWarnings("unchecked") final Map<Object, Object> error = (Map<Object, Object>) responseAsMap.get("error");
assertThat(error, hasEntry("type", "illegal_argument_exception"));
assertThat(
error,
hasEntry("reason", "can not put auto-follow pattern that could override leader settings {\"index.number_of_shards\":\"5\"}")
);
}
private int getNumberOfSuccessfulFollowedIndices() throws IOException {
Request statsRequest = new Request("GET", "/_ccr/stats");
Map<?, ?> response = toMap(client().performRequest(statsRequest));

View File

@ -6,14 +6,20 @@
package org.elasticsearch.xpack.ccr;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.settings.Settings;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.instanceOf;
public class FollowIndexIT extends ESCCRRestTestCase {
@ -40,8 +46,26 @@ public class FollowIndexIT extends ESCCRRestTestCase {
} else if ("follow".equals(targetCluster)) {
logger.info("Running against follow cluster");
final String followIndexName = "test_index2";
final boolean overrideNumberOfReplicas = randomBoolean();
if (overrideNumberOfReplicas) {
followIndex(
client(),
"leader_cluster",
leaderIndexName,
followIndexName,
Settings.builder().put("index.number_of_replicas", 0).build()
);
} else {
followIndex(leaderIndexName, followIndexName);
assertBusy(() -> verifyDocuments(followIndexName, numDocs, "filtered_field:true"));
}
assertBusy(() -> {
verifyDocuments(followIndexName, numDocs, "filtered_field:true");
if (overrideNumberOfReplicas) {
assertThat(getIndexSettingsAsMap("test_index2"), hasEntry("index.number_of_replicas", "0"));
} else {
assertThat(getIndexSettingsAsMap("test_index2"), hasEntry("index.number_of_replicas", "1"));
}
});
// unfollow and then follow and then index a few docs in leader index:
pauseFollow(followIndexName);
resumeFollow(followIndexName);
@ -62,6 +86,29 @@ public class FollowIndexIT extends ESCCRRestTestCase {
}
}
public void testFollowThatOverridesRequiredLeaderSetting() throws IOException {
if ("leader".equals(targetCluster)) {
createIndex("override_leader_index", Settings.EMPTY);
} else {
final Settings settings = Settings.builder().put("index.number_of_shards", 5).build();
final ResponseException responseException = expectThrows(
ResponseException.class,
() -> followIndex(client(), "leader_cluster", "override_leader_index", "override_follow_index", settings)
);
final Response response = responseException.getResponse();
assertThat(response.getStatusLine().getStatusCode(), equalTo(400));
final Map<String, Object> responseAsMap = entityAsMap(response);
assertThat(responseAsMap, hasKey("error"));
assertThat(responseAsMap.get("error"), instanceOf(Map.class));
@SuppressWarnings("unchecked") final Map<Object, Object> error = (Map<Object, Object>) responseAsMap.get("error");
assertThat(error, hasEntry("type", "illegal_argument_exception"));
assertThat(
error,
hasEntry("reason", "can not put follower index that could override leader settings {\"index.number_of_shards\":\"5\"}")
);
}
}
public void testFollowNonExistingLeaderIndex() throws Exception {
if ("follow".equals(targetCluster) == false) {
logger.info("skipping test, waiting for target cluster [follow]" );

View File

@ -13,6 +13,7 @@ import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.json.JsonXContent;
@ -74,9 +75,34 @@ public class ESCCRRestTestCase extends ESRestTestCase {
}
protected static void followIndex(RestClient client, String leaderCluster, String leaderIndex, String followIndex) throws IOException {
followIndex(client, leaderCluster, leaderIndex, followIndex, null);
}
protected static void followIndex(
final RestClient client,
final String leaderCluster,
final String leaderIndex,
final String followIndex,
final Settings settings
) throws IOException {
final Request request = new Request("PUT", "/" + followIndex + "/_ccr/follow?wait_for_active_shards=1");
request.setJsonEntity("{\"remote_cluster\": \"" + leaderCluster + "\", \"leader_index\": \"" + leaderIndex +
"\", \"read_poll_timeout\": \"10ms\"}");
try (XContentBuilder bodyBuilder = JsonXContent.contentBuilder()) {
bodyBuilder.startObject();
{
bodyBuilder.field("remote_cluster", leaderCluster);
bodyBuilder.field("leader_index", leaderIndex);
bodyBuilder.field("read_poll_timeout", "10ms");
if (settings != null) {
bodyBuilder.startObject("settings");
{
settings.toXContent(bodyBuilder, ToXContent.EMPTY_PARAMS);
}
bodyBuilder.endObject();
}
}
bodyBuilder.endObject();
request.setJsonEntity(Strings.toString(bodyBuilder));
}
assertOK(client.performRequest(request));
}

View File

@ -164,8 +164,22 @@ public class CcrLicenseIT extends CcrSingleNodeTestCase {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
AutoFollowPattern autoFollowPattern = new AutoFollowPattern("test_alias", Collections.singletonList("logs-*"),
null, true, null, null, null, null, null, null, null, null, null, null);
AutoFollowPattern autoFollowPattern = new AutoFollowPattern(
"test_alias",
Collections.singletonList("logs-*"),
null,
Settings.EMPTY,
true,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null);
AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata(
Collections.singletonMap("test_alias", autoFollowPattern),
Collections.emptyMap(),

View File

@ -560,6 +560,7 @@ public class AutoFollowCoordinator extends AbstractLifecycleComponent implements
request.setRemoteCluster(remoteCluster);
request.setLeaderIndex(indexToFollow.getName());
request.setFollowerIndex(followIndexName);
request.setSettings(pattern.getSettings());
request.getParameters().setMaxReadRequestOperationCount(pattern.getMaxReadRequestOperationCount());
request.getParameters().setMaxReadRequestSize(pattern.getMaxReadRequestSize());
request.getParameters().setMaxOutstandingReadRequests(pattern.getMaxOutstandingReadRequests());

View File

@ -94,6 +94,7 @@ public class TransportActivateAutoFollowPatternAction extends TransportMasterNod
previousAutoFollowPattern.getRemoteCluster(),
previousAutoFollowPattern.getLeaderIndexPatterns(),
previousAutoFollowPattern.getFollowIndexPattern(),
previousAutoFollowPattern.getSettings(),
request.isActive(),
previousAutoFollowPattern.getMaxReadRequestOperationCount(),
previousAutoFollowPattern.getMaxWriteRequestOperationCount(),

View File

@ -22,6 +22,7 @@ import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.license.LicenseUtils;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@ -34,6 +35,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
@ -78,6 +80,17 @@ public class TransportPutAutoFollowPatternAction extends
listener.onFailure(LicenseUtils.newComplianceException("ccr"));
return;
}
final Settings replicatedRequestSettings = TransportResumeFollowAction.filter(request.getSettings());
if (replicatedRequestSettings.isEmpty() == false) {
final String message = String.format(
Locale.ROOT,
"can not put auto-follow pattern that could override leader settings %s",
replicatedRequestSettings
);
listener.onFailure(new IllegalArgumentException(message));
return;
}
final Client remoteClient = client.getRemoteClusterClient(request.getRemoteCluster());
final Map<String, String> filteredHeaders = threadPool.getThreadContext().getHeaders().entrySet().stream()
.filter(e -> ShardFollowTask.HEADER_FILTERS.contains(e.getKey()))
@ -160,6 +173,7 @@ public class TransportPutAutoFollowPatternAction extends
request.getRemoteCluster(),
request.getLeaderIndexPatterns(),
request.getFollowIndexNamePattern(),
request.getSettings(),
true,
request.getParameters().getMaxReadRequestOperationCount(),
request.getParameters().getMaxWriteRequestOperationCount(),

View File

@ -41,6 +41,7 @@ import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction;
import java.io.IOException;
import java.util.Locale;
import java.util.Objects;
public final class TransportPutFollowAction
@ -123,14 +124,28 @@ public final class TransportPutFollowAction
return;
}
final Settings.Builder settingsBuilder = Settings.builder()
final Settings replicatedRequestSettings = TransportResumeFollowAction.filter(request.getSettings());
if (replicatedRequestSettings.isEmpty() == false) {
final String message = String.format(
Locale.ROOT,
"can not put follower index that could override leader settings %s",
replicatedRequestSettings
);
listener.onFailure(new IllegalArgumentException(message));
return;
}
final Settings overrideSettings = Settings.builder()
.put(IndexMetadata.SETTING_INDEX_PROVIDED_NAME, request.getFollowerIndex())
.put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true);
.put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true)
.put(request.getSettings())
.build();
final String leaderClusterRepoName = CcrRepository.NAME_PREFIX + request.getRemoteCluster();
final RestoreSnapshotRequest restoreRequest = new RestoreSnapshotRequest(leaderClusterRepoName, CcrRepository.LATEST)
.indices(request.getLeaderIndex()).indicesOptions(request.indicesOptions()).renamePattern("^(.*)$")
.renameReplacement(request.getFollowerIndex()).masterNodeTimeout(request.masterNodeTimeout())
.indexSettings(settingsBuilder);
.indexSettings(overrideSettings);
final Client clientWithHeaders = CcrLicenseChecker.wrapClient(this.client, threadPool.getThreadContext().getHeaders());
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new AbstractRunnable() {

View File

@ -54,6 +54,7 @@ import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
@ -237,19 +238,36 @@ public class TransportResumeFollowAction extends TransportMasterNodeAction<Resum
throw new IllegalArgumentException("the following index [" + request.getFollowerIndex() + "] is not ready " +
"to follow; the setting [" + CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey() + "] must be enabled.");
}
// Make a copy, remove settings that are allowed to be different and then compare if the settings are equal.
Settings leaderSettings = filter(leaderIndex.getSettings());
Settings followerSettings = filter(followIndex.getSettings());
if (leaderSettings.equals(followerSettings) == false) {
throw new IllegalArgumentException("the leader index setting[" + leaderSettings + "] and follower index settings [" +
followerSettings + "] must be identical");
}
validateSettings(leaderIndex.getSettings(), followIndex.getSettings());
// Validates if the current follower mapping is mergable with the leader mapping.
// This also validates for example whether specific mapper plugins have been installed
followerMapperService.merge(leaderIndex, MapperService.MergeReason.MAPPING_RECOVERY);
}
/**
* Validate that the settings that are required to be identical between the leader and follower index are in fact equal.
*
* @param leaderIndexSettings the leader index settings
* @param followerIndexSettings the follower index settings
* @throws IllegalArgumentException if there are settings that are required to be equal that are not equal
*/
private static void validateSettings(final Settings leaderIndexSettings, final Settings followerIndexSettings) {
// make a copy, remove settings that are allowed to be different, and then compare if the settings are equal
final Settings leaderSettings = filter(leaderIndexSettings);
final Settings followerSettings = filter(followerIndexSettings);
if (leaderSettings.equals(followerSettings) == false) {
final String message = String.format(
Locale.ROOT,
"the leader index settings [%s] and follower index settings [%s] must be identical",
leaderSettings,
followerSettings
);
throw new IllegalArgumentException(message);
}
}
private static ShardFollowTask createShardFollowTask(
int shardId,
String clusterAliasName,
@ -448,7 +466,7 @@ public class TransportResumeFollowAction extends TransportMasterNodeAction<Resum
NON_REPLICATED_SETTINGS = Collections.unmodifiableSet(nonReplicatedSettings);
}
static Settings filter(Settings originalSettings) {
public static Settings filter(Settings originalSettings) {
Settings.Builder settings = Settings.builder().put(originalSettings);
// Remove settings that are always going to be different between leader and follow index:
settings.remove(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey());

View File

@ -5,7 +5,9 @@
*/
package org.elasticsearch.xpack.ccr;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
@ -44,7 +46,9 @@ public class AutoFollowMetadataTests extends AbstractSerializingTestCase<AutoFol
randomAlphaOfLength(4),
leaderPatterns,
randomAlphaOfLength(4),
true, randomIntBetween(0, Integer.MAX_VALUE),
Settings.builder().put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), randomIntBetween(0, 4)).build(),
true,
randomIntBetween(0, Integer.MAX_VALUE),
randomIntBetween(0, Integer.MAX_VALUE),
randomIntBetween(0, Integer.MAX_VALUE),
randomIntBetween(0, Integer.MAX_VALUE),

View File

@ -100,8 +100,23 @@ public class CCRFeatureSetTests extends ESTestCase {
int numAutoFollowPatterns = randomIntBetween(0, 32);
Map<String, AutoFollowMetadata.AutoFollowPattern> patterns = new HashMap<>(numAutoFollowPatterns);
for (int i = 0; i < numAutoFollowPatterns; i++) {
AutoFollowMetadata.AutoFollowPattern pattern = new AutoFollowMetadata.AutoFollowPattern("remote_cluser",
Collections.singletonList("logs" + i + "*"), null, true, null, null, null, null, null, null, null, null, null, null);
AutoFollowMetadata.AutoFollowPattern pattern = new AutoFollowMetadata.AutoFollowPattern(
"remote_cluser",
Collections.singletonList("logs" + i + "*"),
null,
Settings.EMPTY,
true,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
);
patterns.put("pattern" + i, pattern);
}
metadata.putCustom(AutoFollowMetadata.TYPE, new AutoFollowMetadata(patterns, Collections.emptyMap(), Collections.emptyMap()));

View File

@ -89,8 +89,23 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
ClusterState remoteState = createRemoteClusterState("logs-20190101", true);
AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("logs-*"),
null, true, null, null, null, null, null, null, null, null, null, null);
AutoFollowPattern autoFollowPattern = new AutoFollowPattern(
"remote",
Collections.singletonList("logs-*"),
null,
Settings.EMPTY,
true,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
);
Map<String, AutoFollowPattern> patterns = new HashMap<>();
patterns.put("remote", autoFollowPattern);
Map<String, List<String>> followedLeaderIndexUUIDS = new HashMap<>();
@ -158,8 +173,23 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
Client client = mock(Client.class);
when(client.getRemoteClusterClient(anyString())).thenReturn(client);
AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("logs-*"),
null, true, null, null, null, null, null, null, null, null, null, null);
AutoFollowPattern autoFollowPattern = new AutoFollowPattern(
"remote",
Collections.singletonList("logs-*"),
null,
Settings.EMPTY,
true,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
);
Map<String, AutoFollowPattern> patterns = new HashMap<>();
patterns.put("remote", autoFollowPattern);
Map<String, List<String>> followedLeaderIndexUUIDS = new HashMap<>();
@ -210,8 +240,23 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
when(client.getRemoteClusterClient(anyString())).thenReturn(client);
ClusterState remoteState = createRemoteClusterState("logs-20190101", true);
AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("logs-*"),
null, true, null, null, null, null, null, null, null, null, null, null);
AutoFollowPattern autoFollowPattern = new AutoFollowPattern(
"remote",
Collections.singletonList("logs-*"),
null,
Settings.EMPTY,
true,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
);
Map<String, AutoFollowPattern> patterns = new HashMap<>();
patterns.put("remote", autoFollowPattern);
Map<String, List<String>> followedLeaderIndexUUIDS = new HashMap<>();
@ -266,10 +311,41 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
final String remoteCluster = randomAlphaOfLength(5);
final Map<String, AutoFollowPattern> autoFollowPatterns = new HashMap<>(2);
autoFollowPatterns.put("pattern_1", new AutoFollowPattern(remoteCluster, Arrays.asList("logs-*", "test-*"), "copy-", false,
null, null, null, null, null, null, null, null, null, null));
autoFollowPatterns.put("pattern_2", new AutoFollowPattern(remoteCluster, Arrays.asList("users-*"), "copy-", false, null, null,
null, null, null, null, null, null, null, null));
autoFollowPatterns.put("pattern_1", new AutoFollowPattern(
remoteCluster,
Arrays.asList("logs-*", "test-*"),
"copy-",
Settings.EMPTY,
false,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
)
);
autoFollowPatterns.put("pattern_2", new AutoFollowPattern(
remoteCluster,
Arrays.asList("users-*"), "copy-",
Settings.EMPTY,
false,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
)
);
final Map<String, List<String>> followedLeaderIndexUUIDs = new HashMap<>(2);
followedLeaderIndexUUIDs.put("pattern_1", Arrays.asList("uuid1", "uuid2"));
@ -486,8 +562,22 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
when(client.getRemoteClusterClient(anyString())).thenReturn(client);
ClusterState remoteState = createRemoteClusterState("logs-20190101", true);
AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("logs-*"),
null, true, null, null, null, null, null, null, null, null, null, null);
AutoFollowPattern autoFollowPattern = new AutoFollowPattern(
"remote",
Collections.singletonList("logs-*"),
null,
Settings.EMPTY, true,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
);
Map<String, AutoFollowPattern> patterns = new HashMap<>();
patterns.put("remote", autoFollowPattern);
Map<String, List<String>> followedLeaderIndexUUIDS = new HashMap<>();
@ -545,8 +635,23 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
}
public void testGetLeaderIndicesToFollow() {
AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("metrics-*"), null, true,
null, null, null, null, null, null, null, null, null, null);
final AutoFollowPattern autoFollowPattern = new AutoFollowPattern(
"remote",
Collections.singletonList("metrics-*"),
null,
Settings.EMPTY,
true,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
);
Map<String, Map<String, String>> headers = new HashMap<>();
RoutingTable.Builder routingTableBuilder = RoutingTable.builder();
@ -601,8 +706,23 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
assertThat(result.get(2).getName(), equalTo("metrics-3"));
assertThat(result.get(3).getName(), equalTo("metrics-4"));
final AutoFollowPattern inactiveAutoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("metrics-*"), null,
false, null, null, null, null, null, null, null, null, null, null);
final AutoFollowPattern inactiveAutoFollowPattern = new AutoFollowPattern(
"remote",
Collections.singletonList("metrics-*"),
null,
Settings.EMPTY,
false,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
);
result = AutoFollower.getLeaderIndicesToFollow(inactiveAutoFollowPattern, remoteState, Collections.emptyList());
assertThat(result.size(), equalTo(0));
@ -612,8 +732,23 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
}
public void testGetLeaderIndicesToFollow_shardsNotStarted() {
AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("*"), null, true,
null, null, null, null, null, null, null, null, null, null);
AutoFollowPattern autoFollowPattern = new AutoFollowPattern(
"remote",
Collections.singletonList("*"),
null,
Settings.EMPTY,
true,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
);
// 1 shard started and another not started:
ClusterState remoteState = createRemoteClusterState("index1", true);
@ -652,8 +787,23 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
}
public void testGetLeaderIndicesToFollowWithClosedIndices() {
final AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("*"),
null, true, null, null, null, null, null, null, null, null, null, null);
final AutoFollowPattern autoFollowPattern = new AutoFollowPattern(
"remote",
Collections.singletonList("*"),
null,
Settings.EMPTY,
true,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
);
// index is opened
ClusterState remoteState = ClusterStateCreationUtils.stateWithActivePrimary("test-index", true, randomIntBetween(1, 3), 0);
@ -775,16 +925,61 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
}
public void testGetFollowerIndexName() {
AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("metrics-*"), null, true, null,
null, null, null, null, null, null, null, null, null);
AutoFollowPattern autoFollowPattern = new AutoFollowPattern(
"remote",
Collections.singletonList("metrics-*"),
null,
Settings.EMPTY,
true,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
);
assertThat(AutoFollower.getFollowerIndexName(autoFollowPattern, "metrics-0"), equalTo("metrics-0"));
autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("metrics-*"), "eu-metrics-0", true, null, null,
null, null, null, null, null, null, null, null);
autoFollowPattern = new AutoFollowPattern(
"remote",
Collections.singletonList("metrics-*"),
"eu-metrics-0",
Settings.EMPTY,
true,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
);
assertThat(AutoFollower.getFollowerIndexName(autoFollowPattern, "metrics-0"), equalTo("eu-metrics-0"));
autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("metrics-*"), "eu-{{leader_index}}", true, null,
null, null, null, null, null, null, null, null, null);
autoFollowPattern = new AutoFollowPattern(
"remote",
Collections.singletonList("metrics-*"),
"eu-{{leader_index}}",
Settings.EMPTY,
true,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
);
assertThat(AutoFollower.getFollowerIndexName(autoFollowPattern, "metrics-0"), equalTo("eu-metrics-0"));
}
@ -864,12 +1059,65 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
Runnable::run);
// Add 3 patterns:
Map<String, AutoFollowPattern> patterns = new HashMap<>();
patterns.put("pattern1", new AutoFollowPattern("remote1", Collections.singletonList("logs-*"), null, true, null, null,
null, null, null, null, null, null, null, null));
patterns.put("pattern2", new AutoFollowPattern("remote2", Collections.singletonList("logs-*"), null, true, null, null,
null, null, null, null, null, null, null, null));
patterns.put("pattern3", new AutoFollowPattern("remote2", Collections.singletonList("metrics-*"), null, true, null, null,
null, null, null, null, null, null, null, null));
patterns.put(
"pattern1", new AutoFollowPattern(
"remote1",
Collections.singletonList("logs-*"),
null,
Settings.EMPTY,
true,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
)
);
patterns.put(
"pattern2",
new AutoFollowPattern(
"remote2",
Collections.singletonList("logs-*"),
null,
Settings.EMPTY,
true,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
)
);
patterns.put(
"pattern3",
new AutoFollowPattern(
"remote2",
Collections.singletonList("metrics-*"),
null,
Settings.EMPTY,
true,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
)
);
ClusterState clusterState = ClusterState.builder(new ClusterName("remote"))
.metadata(Metadata.builder().putCustom(AutoFollowMetadata.TYPE,
new AutoFollowMetadata(patterns, Collections.emptyMap(), Collections.emptyMap())))
@ -894,8 +1142,26 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
assertThat(autoFollowCoordinator.getStats().getAutoFollowedClusters().get("remote2"), notNullValue());
assertThat(removedAutoFollower1.removed, is(true));
// Add pattern 4:
patterns.put("pattern4", new AutoFollowPattern("remote1", Collections.singletonList("metrics-*"), null, true, null, null,
null, null, null, null, null, null, null, null));
patterns.put(
"pattern4",
new AutoFollowPattern(
"remote1",
Collections.singletonList("metrics-*"),
null,
Settings.EMPTY,
true,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
)
);
clusterState = ClusterState.builder(new ClusterName("remote"))
.metadata(Metadata.builder().putCustom(AutoFollowMetadata.TYPE,
new AutoFollowMetadata(patterns, Collections.emptyMap(), Collections.emptyMap())))
@ -970,12 +1236,64 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
// Add 3 patterns:
Map<String, AutoFollowPattern> patterns = new HashMap<>();
patterns.put("pattern1", new AutoFollowPattern("remote1", Collections.singletonList("logs-*"), null, true, null, null,
null, null, null, null, null, null, null, null));
patterns.put("pattern2", new AutoFollowPattern("remote2", Collections.singletonList("logs-*"), null, true, null, null,
null, null, null, null, null, null, null, null));
patterns.put("pattern3", new AutoFollowPattern("remote2", Collections.singletonList("metrics-*"), null, true, null, null,
null, null, null, null, null, null, null, null));
patterns.put(
"pattern1",
new AutoFollowPattern(
"remote1",
Collections.singletonList("logs-*"),
null,
Settings.EMPTY,
true,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
)
);
patterns.put(
"pattern2",
new AutoFollowPattern("remote2",
Collections.singletonList("logs-*"),
null,
Settings.EMPTY,
true,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
)
);
patterns.put(
"pattern3",
new AutoFollowPattern("remote2",
Collections.singletonList("metrics-*"),
null,
Settings.EMPTY,
true,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
)
);
autoFollowCoordinator.updateAutoFollowers(ClusterState.builder(new ClusterName("remote"))
.metadata(Metadata.builder().putCustom(AutoFollowMetadata.TYPE,
@ -991,16 +1309,45 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
assertThat(removedAutoFollower2.removed, is(false));
// Make pattern 1 and pattern 3 inactive
patterns.computeIfPresent("pattern1", (name, pattern) -> new AutoFollowPattern(pattern.getRemoteCluster(),
pattern.getLeaderIndexPatterns(), pattern.getFollowIndexPattern(), false, pattern.getMaxReadRequestOperationCount(),
pattern.getMaxWriteRequestOperationCount(), pattern.getMaxOutstandingReadRequests(), pattern.getMaxOutstandingWriteRequests(),
pattern.getMaxReadRequestSize(), pattern.getMaxWriteRequestSize(), pattern.getMaxWriteBufferCount(),
pattern.getMaxWriteBufferSize(), pattern.getMaxRetryDelay(), pattern.getReadPollTimeout()));
patterns.computeIfPresent("pattern3", (name, pattern) -> new AutoFollowPattern(pattern.getRemoteCluster(),
pattern.getLeaderIndexPatterns(), pattern.getFollowIndexPattern(), false, pattern.getMaxReadRequestOperationCount(),
pattern.getMaxWriteRequestOperationCount(), pattern.getMaxOutstandingReadRequests(), pattern.getMaxOutstandingWriteRequests(),
pattern.getMaxReadRequestSize(), pattern.getMaxWriteRequestSize(), pattern.getMaxWriteBufferCount(),
pattern.getMaxWriteBufferSize(), pattern.getMaxRetryDelay(), pattern.getReadPollTimeout()));
patterns.computeIfPresent(
"pattern1",
(name, pattern) -> new AutoFollowPattern(
pattern.getRemoteCluster(),
pattern.getLeaderIndexPatterns(),
pattern.getFollowIndexPattern(),
Settings.EMPTY,
false,
pattern.getMaxReadRequestOperationCount(),
pattern.getMaxWriteRequestOperationCount(),
pattern.getMaxOutstandingReadRequests(),
pattern.getMaxOutstandingWriteRequests(),
pattern.getMaxReadRequestSize(),
pattern.getMaxWriteRequestSize(),
pattern.getMaxWriteBufferCount(),
pattern.getMaxWriteBufferSize(),
pattern.getMaxRetryDelay(),
pattern.getReadPollTimeout()
)
);
patterns.computeIfPresent(
"pattern3",
(name, pattern) -> new AutoFollowPattern(pattern.getRemoteCluster(),
pattern.getLeaderIndexPatterns(),
pattern.getFollowIndexPattern(),
Settings.EMPTY,
false,
pattern.getMaxReadRequestOperationCount(),
pattern.getMaxWriteRequestOperationCount(),
pattern.getMaxOutstandingReadRequests(),
pattern.getMaxOutstandingWriteRequests(),
pattern.getMaxReadRequestSize(),
pattern.getMaxWriteRequestSize(),
pattern.getMaxWriteBufferCount(),
pattern.getMaxWriteBufferSize(),
pattern.getMaxRetryDelay(),
pattern.getReadPollTimeout()
)
);
autoFollowCoordinator.updateAutoFollowers(ClusterState.builder(new ClusterName("remote"))
.metadata(Metadata.builder().putCustom(AutoFollowMetadata.TYPE,
@ -1012,13 +1359,45 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
assertThat(removedAutoFollower2.removed, is(false));
// Add active pattern 4 and make pattern 2 inactive
patterns.put("pattern4", new AutoFollowPattern("remote1", Collections.singletonList("metrics-*"), null, true, null, null,
null, null, null, null, null, null, null, null));
patterns.computeIfPresent("pattern2", (name, pattern) -> new AutoFollowPattern(pattern.getRemoteCluster(),
pattern.getLeaderIndexPatterns(), pattern.getFollowIndexPattern(), false, pattern.getMaxReadRequestOperationCount(),
pattern.getMaxWriteRequestOperationCount(), pattern.getMaxOutstandingReadRequests(), pattern.getMaxOutstandingWriteRequests(),
pattern.getMaxReadRequestSize(), pattern.getMaxWriteRequestSize(), pattern.getMaxWriteBufferCount(),
pattern.getMaxWriteBufferSize(), pattern.getMaxRetryDelay(), pattern.getReadPollTimeout()));
patterns.put(
"pattern4",
new AutoFollowPattern(
"remote1",
Collections.singletonList("metrics-*"),
null,
Settings.EMPTY,
true,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
)
);
patterns.computeIfPresent(
"pattern2",
(name, pattern) -> new AutoFollowPattern(pattern.getRemoteCluster(),
pattern.getLeaderIndexPatterns(),
pattern.getFollowIndexPattern(),
Settings.EMPTY,
false,
pattern.getMaxReadRequestOperationCount(),
pattern.getMaxWriteRequestOperationCount(),
pattern.getMaxOutstandingReadRequests(),
pattern.getMaxOutstandingWriteRequests(),
pattern.getMaxReadRequestSize(),
pattern.getMaxWriteRequestSize(),
pattern.getMaxWriteBufferCount(),
pattern.getMaxWriteBufferSize(),
pattern.getMaxRetryDelay(),
pattern.getReadPollTimeout()
)
);
autoFollowCoordinator.updateAutoFollowers(ClusterState.builder(new ClusterName("remote"))
.metadata(Metadata.builder().putCustom(AutoFollowMetadata.TYPE,
@ -1046,8 +1425,23 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
Client client = mock(Client.class);
when(client.getRemoteClusterClient(anyString())).thenReturn(client);
AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("logs-*"),
null, true, null, null, null, null, null, null, null, null, null, null);
AutoFollowPattern autoFollowPattern = new AutoFollowPattern(
"remote",
Collections.singletonList("logs-*"),
null,
Settings.EMPTY,
true,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
);
Map<String, AutoFollowPattern> patterns = new HashMap<>();
patterns.put("remote", autoFollowPattern);
Map<String, List<String>> followedLeaderIndexUUIDS = new HashMap<>();
@ -1109,8 +1503,23 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
Client client = mock(Client.class);
when(client.getRemoteClusterClient(anyString())).thenReturn(client);
AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("logs-*"),
null, true, null, null, null, null, null, null, null, null, null, null);
AutoFollowPattern autoFollowPattern = new AutoFollowPattern(
"remote",
Collections.singletonList("logs-*"),
null,
Settings.EMPTY,
true,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
);
Map<String, AutoFollowPattern> patterns = new HashMap<>();
patterns.put("remote", autoFollowPattern);
Map<String, List<String>> followedLeaderIndexUUIDS = new HashMap<>();
@ -1168,8 +1577,23 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
ClusterState remoteState = randomBoolean() ? createRemoteClusterState("logs-20190101", false) :
createRemoteClusterState("logs-20190101", null);
AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("logs-*"),
null, true, null, null, null, null, null, null, null, null, null, null);
AutoFollowPattern autoFollowPattern = new AutoFollowPattern(
"remote",
Collections.singletonList("logs-*"),
null,
Settings.EMPTY,
true,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
);
Map<String, AutoFollowPattern> patterns = new HashMap<>();
patterns.put("remote", autoFollowPattern);
Map<String, List<String>> followedLeaderIndexUUIDS = new HashMap<>();
@ -1234,8 +1658,23 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
ClusterState remoteState = createRemoteClusterState("logs-20190101", true);
AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("logs-*"),
null, true, null, null, null, null, null, null, null, null, null, null);
AutoFollowPattern autoFollowPattern = new AutoFollowPattern(
"remote",
Collections.singletonList("logs-*"),
null,
Settings.EMPTY,
true,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
);
Map<String, AutoFollowPattern> patterns = new HashMap<>();
patterns.put("remote", autoFollowPattern);
Map<String, List<String>> followedLeaderIndexUUIDS = new HashMap<>();
@ -1321,7 +1760,9 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
"remote",
Collections.singletonList("*"),
"{}",
true, 0,
Settings.EMPTY,
true,
0,
0,
0,
0,
@ -1395,9 +1836,27 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
final ClusterState localState = ClusterState.builder(new ClusterName("local"))
.metadata(Metadata.builder()
.putCustom(AutoFollowMetadata.TYPE,
new AutoFollowMetadata(Collections.singletonMap(pattern,
new AutoFollowPattern("remote", Collections.singletonList("docs-*"), null, true,
null, null, null, null, null, null, null, null, null, null)),
new AutoFollowMetadata(
Collections.singletonMap(
pattern,
new AutoFollowPattern(
"remote",
Collections.singletonList("docs-*"),
null,
Settings.EMPTY,
true,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
)
),
Collections.singletonMap(pattern, Collections.emptyList()),
Collections.singletonMap(pattern, Collections.emptyMap()))))
.build();

View File

@ -5,7 +5,9 @@
*/
package org.elasticsearch.xpack.ccr.action;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
@ -33,7 +35,9 @@ public class GetAutoFollowPatternResponseTests extends AbstractWireSerializingTe
"remote",
Collections.singletonList(randomAlphaOfLength(4)),
randomAlphaOfLength(4),
true, randomIntBetween(0, Integer.MAX_VALUE),
Settings.builder().put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), randomIntBetween(0, 4)).build(),
true,
randomIntBetween(0, Integer.MAX_VALUE),
randomIntBetween(0, Integer.MAX_VALUE),
randomIntBetween(0, Integer.MAX_VALUE),
randomIntBetween(0, Integer.MAX_VALUE),

View File

@ -6,7 +6,9 @@
package org.elasticsearch.xpack.ccr.action;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.test.AbstractSerializingTestCase;
@ -30,6 +32,9 @@ public class PutFollowActionRequestTests extends AbstractSerializingTestCase<Put
request.setRemoteCluster(randomAlphaOfLength(4));
request.setLeaderIndex(randomAlphaOfLength(4));
request.setSettings(
Settings.builder().put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), randomIntBetween(0, 4)).build()
);
ResumeFollowActionRequestTests.generateFollowParameters(request.getParameters());
return request;
}
@ -41,6 +46,9 @@ public class PutFollowActionRequestTests extends AbstractSerializingTestCase<Put
PutFollowAction.Request request = new PutFollowAction.Request();
request.setRemoteCluster(randomAlphaOfLength(4));
request.setLeaderIndex(randomAlphaOfLength(4));
request.setSettings(
Settings.builder().put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), randomIntBetween(0, 4)).build()
);
request.setFollowerIndex("followerIndex");
ResumeFollowActionRequestTests.generateFollowParameters(request.getParameters());
return request;

View File

@ -8,7 +8,9 @@ package org.elasticsearch.xpack.ccr.action;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
@ -81,6 +83,7 @@ public class TransportActivateAutoFollowPatternActionTests extends ESTestCase {
return new AutoFollowMetadata.AutoFollowPattern(randomAlphaOfLength(5),
randomSubsetOf(Arrays.asList("test-*", "user-*", "logs-*", "failures-*")),
randomFrom("{{leader_index}}", "{{leader_index}}-follower", "test"),
Settings.builder().put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), randomIntBetween(0, 4)).build(),
randomBoolean(),
randomIntBetween(1, 100),
randomIntBetween(1, 100),

View File

@ -9,6 +9,7 @@ import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata.AutoFollowPattern;
import org.elasticsearch.xpack.core.ccr.action.DeleteAutoFollowPatternAction.Request;
@ -32,8 +33,25 @@ public class TransportDeleteAutoFollowPatternActionTests extends ESTestCase {
{
List<String> existingPatterns = new ArrayList<>();
existingPatterns.add("transactions-*");
existingAutoFollowPatterns.put("name1", new AutoFollowPattern("eu_cluster", existingPatterns, null, true, null, null, null,
null, null, null, null, null, null, null));
existingAutoFollowPatterns.put(
"name1",
new AutoFollowPattern(
"eu_cluster",
existingPatterns,
null,
Settings.EMPTY, true,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
)
);
List<String> existingUUIDS = new ArrayList<>();
existingUUIDS.add("_val");
@ -43,8 +61,26 @@ public class TransportDeleteAutoFollowPatternActionTests extends ESTestCase {
{
List<String> existingPatterns = new ArrayList<>();
existingPatterns.add("logs-*");
existingAutoFollowPatterns.put("name2", new AutoFollowPattern("asia_cluster", existingPatterns, null, true, null, null, null,
null, null, null, null, null, null, null));
existingAutoFollowPatterns.put(
"name2",
new AutoFollowPattern(
"asia_cluster",
existingPatterns,
null,
Settings.EMPTY,
true,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
)
);
List<String> existingUUIDS = new ArrayList<>();
existingUUIDS.add("_val");
@ -76,8 +112,26 @@ public class TransportDeleteAutoFollowPatternActionTests extends ESTestCase {
{
List<String> existingPatterns = new ArrayList<>();
existingPatterns.add("transactions-*");
existingAutoFollowPatterns.put("name1", new AutoFollowPattern("eu_cluster", existingPatterns, null, true, null, null, null,
null, null, null, null, null, null, null));
existingAutoFollowPatterns.put(
"name1",
new AutoFollowPattern(
"eu_cluster",
existingPatterns,
null,
Settings.EMPTY,
true,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
)
);
existingHeaders.put("key", Collections.singletonMap("key", "val"));
}
ClusterState clusterState = ClusterState.builder(new ClusterName("us_cluster"))

View File

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.ccr.action;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata.AutoFollowPattern;
@ -23,10 +24,46 @@ public class TransportGetAutoFollowPatternActionTests extends ESTestCase {
public void testGetAutoFollowPattern() {
Map<String, AutoFollowPattern> patterns = new HashMap<>();
patterns.put("name1", new AutoFollowPattern(
"test_alias1", Collections.singletonList("index-*"), null, true, null, null, null, null, null, null, null, null, null, null));
patterns.put("name2", new AutoFollowPattern(
"test_alias1", Collections.singletonList("index-*"), null, true, null, null, null, null, null, null, null, null, null, null));
patterns.put(
"name1",
new AutoFollowPattern(
"test_alias1",
Collections.singletonList("index-*"),
null,
Settings.EMPTY,
true,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
)
);
patterns.put(
"name2",
new AutoFollowPattern(
"test_alias1",
Collections.singletonList("index-*"),
null,
Settings.EMPTY,
true,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
)
);
Metadata metadata = Metadata.builder()
.putCustom(AutoFollowMetadata.TYPE, new AutoFollowMetadata(patterns, Collections.emptyMap(), Collections.emptyMap()))
.build();

View File

@ -10,6 +10,7 @@ import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata.AutoFollowPattern;
@ -23,6 +24,7 @@ import java.util.List;
import java.util.Map;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.notNullValue;
public class TransportPutAutoFollowPatternActionTests extends ESTestCase {
@ -32,6 +34,8 @@ public class TransportPutAutoFollowPatternActionTests extends ESTestCase {
request.setName("name1");
request.setRemoteCluster("eu_cluster");
request.setLeaderIndexPatterns(Collections.singletonList("logs-*"));
final int numberOfReplicas = randomIntBetween(0, 4);
request.setSettings(Settings.builder().put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), numberOfReplicas).build());
ClusterState localState = ClusterState.builder(new ClusterName("us_cluster"))
.metadata(Metadata.builder())
@ -48,6 +52,13 @@ public class TransportPutAutoFollowPatternActionTests extends ESTestCase {
assertThat(autoFollowMetadata.getPatterns().get("name1").getRemoteCluster(), equalTo("eu_cluster"));
assertThat(autoFollowMetadata.getPatterns().get("name1").getLeaderIndexPatterns().size(), equalTo(1));
assertThat(autoFollowMetadata.getPatterns().get("name1").getLeaderIndexPatterns().get(0), equalTo("logs-*"));
assertThat(
autoFollowMetadata.getPatterns().get("name1").getSettings().keySet(),
hasItem(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey())
);
assertThat(
IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.get(autoFollowMetadata.getPatterns().get("name1").getSettings()),
equalTo(numberOfReplicas));
assertThat(autoFollowMetadata.getFollowedLeaderIndexUUIDs().size(), equalTo(1));
assertThat(autoFollowMetadata.getFollowedLeaderIndexUUIDs().get("name1").size(), equalTo(0));
}
@ -102,8 +113,24 @@ public class TransportPutAutoFollowPatternActionTests extends ESTestCase {
Map<String, AutoFollowPattern> existingAutoFollowPatterns = new HashMap<>();
List<String> existingPatterns = new ArrayList<>();
existingPatterns.add("transactions-*");
existingAutoFollowPatterns.put("name1",
new AutoFollowPattern("eu_cluster", existingPatterns, null, true, null, null, null, null, null, null, null, null, null, null));
existingAutoFollowPatterns.put(
"name1",
new AutoFollowPattern(
"eu_cluster",
existingPatterns,
null,
Settings.EMPTY,
true,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null));
Map<String, List<String>> existingAlreadyFollowedIndexUUIDS = new HashMap<>();
List<String> existingUUIDS = new ArrayList<>();
existingUUIDS.add("_val");

View File

@ -154,7 +154,7 @@ public class TransportResumeFollowActionTests extends ESTestCase {
.put("index.analysis.analyzer.my_analyzer.type", "custom")
.put("index.analysis.analyzer.my_analyzer.tokenizer", "standard").build(), customMetadata);
Exception e = expectThrows(IllegalArgumentException.class, () -> validate(request, leaderIMD, followIMD, UUIDs, null));
assertThat(e.getMessage(), equalTo("the leader index setting[{\"index.analysis.analyzer.my_analyzer.tokenizer\"" +
assertThat(e.getMessage(), equalTo("the leader index settings [{\"index.analysis.analyzer.my_analyzer.tokenizer\"" +
":\"whitespace\",\"index.analysis.analyzer.my_analyzer.type\":\"custom\",\"index.number_of_shards\":\"5\"}] " +
"and follower index settings [{\"index.analysis.analyzer.my_analyzer.tokenizer\":\"standard\"," +
"\"index.analysis.analyzer.my_analyzer.type\":\"custom\",\"index.number_of_shards\":\"5\"}] must be identical"));

View File

@ -13,6 +13,7 @@ import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
@ -180,19 +181,38 @@ public class AutoFollowMetadata extends AbstractNamedDiffable<Metadata.Custom> i
public static final ParseField REMOTE_CLUSTER_FIELD = new ParseField("remote_cluster");
public static final ParseField LEADER_PATTERNS_FIELD = new ParseField("leader_index_patterns");
public static final ParseField FOLLOW_PATTERN_FIELD = new ParseField("follow_index_pattern");
public static final ParseField SETTINGS_FIELD = new ParseField("settings");
@SuppressWarnings("unchecked")
private static final ConstructingObjectParser<AutoFollowPattern, Void> PARSER =
new ConstructingObjectParser<>("auto_follow_pattern",
args -> new AutoFollowPattern((String) args[0], (List<String>) args[1], (String) args[2],
args[3] == null || (boolean) args[3], (Integer) args[4], (Integer) args[5], (Integer) args[6], (Integer) args[7],
(ByteSizeValue) args[8], (ByteSizeValue) args[9], (Integer) args[10], (ByteSizeValue) args[11], (TimeValue) args[12],
(TimeValue) args[13]));
args -> new AutoFollowPattern(
(String) args[0],
(List<String>) args[1],
(String) args[2],
args[3] == null ? Settings.EMPTY : (Settings) args[3],
args[4] == null || (boolean) args[4],
(Integer) args[5],
(Integer) args[6],
(Integer) args[7],
(Integer) args[8],
(ByteSizeValue) args[9],
(ByteSizeValue) args[10],
(Integer) args[11],
(ByteSizeValue) args[12],
(TimeValue) args[13],
(TimeValue) args[14])
);
static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), REMOTE_CLUSTER_FIELD);
PARSER.declareStringArray(ConstructingObjectParser.constructorArg(), LEADER_PATTERNS_FIELD);
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), FOLLOW_PATTERN_FIELD);
PARSER.declareObject(
ConstructingObjectParser.optionalConstructorArg(),
(p, c) -> Settings.fromXContent(p),
SETTINGS_FIELD
);
PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), ACTIVE);
ImmutableFollowParameters.initParser(PARSER);
}
@ -200,11 +220,14 @@ public class AutoFollowMetadata extends AbstractNamedDiffable<Metadata.Custom> i
private final String remoteCluster;
private final List<String> leaderIndexPatterns;
private final String followIndexPattern;
private final Settings settings;
private final boolean active;
public AutoFollowPattern(String remoteCluster,
public AutoFollowPattern(
String remoteCluster,
List<String> leaderIndexPatterns,
String followIndexPattern,
Settings settings,
boolean active,
Integer maxReadRequestOperationCount,
Integer maxWriteRequestOperationCount,
@ -215,25 +238,37 @@ public class AutoFollowMetadata extends AbstractNamedDiffable<Metadata.Custom> i
Integer maxWriteBufferCount,
ByteSizeValue maxWriteBufferSize,
TimeValue maxRetryDelay,
TimeValue pollTimeout) {
TimeValue pollTimeout
) {
super(maxReadRequestOperationCount, maxWriteRequestOperationCount, maxOutstandingReadRequests, maxOutstandingWriteRequests,
maxReadRequestSize, maxWriteRequestSize, maxWriteBufferCount, maxWriteBufferSize, maxRetryDelay, pollTimeout);
this.remoteCluster = remoteCluster;
this.leaderIndexPatterns = leaderIndexPatterns;
this.followIndexPattern = followIndexPattern;
this.settings = Objects.requireNonNull(settings);
this.active = active;
}
public static AutoFollowPattern readFrom(StreamInput in) throws IOException {
return new AutoFollowPattern(in.readString(), in.readStringList(), in.readOptionalString(), in);
final String remoteCluster = in.readString();
final List<String> leaderIndexPatterns = in.readStringList();
final String followIndexPattern = in.readOptionalString();
final Settings settings;
if (in.getVersion().onOrAfter(Version.V_7_9_0)) {
settings = Settings.readSettingsFromStream(in);
} else {
settings = Settings.EMPTY;
}
return new AutoFollowPattern(remoteCluster, leaderIndexPatterns, followIndexPattern, settings, in);
}
private AutoFollowPattern(String remoteCluster, List<String> leaderIndexPatterns,
String followIndexPattern, StreamInput in) throws IOException {
String followIndexPattern, Settings settings, StreamInput in) throws IOException {
super(in);
this.remoteCluster = remoteCluster;
this.leaderIndexPatterns = leaderIndexPatterns;
this.followIndexPattern = followIndexPattern;
this.settings = Objects.requireNonNull(settings);
if (in.getVersion().onOrAfter(Version.V_7_5_0)) {
this.active = in.readBoolean();
} else {
@ -261,6 +296,10 @@ public class AutoFollowMetadata extends AbstractNamedDiffable<Metadata.Custom> i
return followIndexPattern;
}
public Settings getSettings() {
return settings;
}
public boolean isActive() {
return active;
}
@ -270,6 +309,9 @@ public class AutoFollowMetadata extends AbstractNamedDiffable<Metadata.Custom> i
out.writeString(remoteCluster);
out.writeStringCollection(leaderIndexPatterns);
out.writeOptionalString(followIndexPattern);
if (out.getVersion().onOrAfter(Version.V_7_9_0)) {
Settings.writeSettingsToStream(settings, out);
}
super.writeTo(out);
if (out.getVersion().onOrAfter(Version.V_7_5_0)) {
out.writeBoolean(active);
@ -284,6 +326,13 @@ public class AutoFollowMetadata extends AbstractNamedDiffable<Metadata.Custom> i
if (followIndexPattern != null) {
builder.field(FOLLOW_PATTERN_FIELD.getPreferredName(), followIndexPattern);
}
if (settings.isEmpty() == false) {
builder.startObject(SETTINGS_FIELD.getPreferredName());
{
settings.toXContent(builder, params);
}
builder.endObject();
}
toXContentFragment(builder);
return builder;
}
@ -297,12 +346,13 @@ public class AutoFollowMetadata extends AbstractNamedDiffable<Metadata.Custom> i
return active == pattern.active &&
remoteCluster.equals(pattern.remoteCluster) &&
leaderIndexPatterns.equals(pattern.leaderIndexPatterns) &&
followIndexPattern.equals(pattern.followIndexPattern);
followIndexPattern.equals(pattern.followIndexPattern) &&
settings.equals(pattern.settings);
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), remoteCluster, leaderIndexPatterns, followIndexPattern, active);
return Objects.hash(super.hashCode(), remoteCluster, leaderIndexPatterns, followIndexPattern, settings, active);
}
}

View File

@ -12,6 +12,7 @@ import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
@ -26,6 +27,7 @@ import java.util.Objects;
import static org.elasticsearch.action.ValidateActions.addValidationError;
import static org.elasticsearch.xpack.core.ccr.AutoFollowMetadata.AutoFollowPattern.REMOTE_CLUSTER_FIELD;
import static org.elasticsearch.xpack.core.ccr.AutoFollowMetadata.AutoFollowPattern.SETTINGS_FIELD;
public class PutAutoFollowPatternAction extends ActionType<AcknowledgedResponse> {
@ -49,6 +51,7 @@ public class PutAutoFollowPatternAction extends ActionType<AcknowledgedResponse>
PARSER.declareString((params, value) -> params.remoteCluster = value, REMOTE_CLUSTER_FIELD);
PARSER.declareStringArray((params, value) -> params.leaderIndexPatterns = value, AutoFollowPattern.LEADER_PATTERNS_FIELD);
PARSER.declareString((params, value) -> params.followIndexNamePattern = value, AutoFollowPattern.FOLLOW_PATTERN_FIELD);
PARSER.declareObject((params, value) -> params.settings = value, (p, c) -> Settings.fromXContent(p), SETTINGS_FIELD);
FollowParameters.initParser(PARSER);
}
@ -59,6 +62,7 @@ public class PutAutoFollowPatternAction extends ActionType<AcknowledgedResponse>
request.setRemoteCluster(parameters.remoteCluster);
request.setLeaderIndexPatterns(parameters.leaderIndexPatterns);
request.setFollowIndexNamePattern(parameters.followIndexNamePattern);
request.setSettings(parameters.settings);
request.setParameters(parameters);
return request;
}
@ -67,6 +71,7 @@ public class PutAutoFollowPatternAction extends ActionType<AcknowledgedResponse>
private String remoteCluster;
private List<String> leaderIndexPatterns;
private String followIndexNamePattern;
private Settings settings = Settings.EMPTY;
private FollowParameters parameters = new FollowParameters();
public Request() {
@ -134,6 +139,14 @@ public class PutAutoFollowPatternAction extends ActionType<AcknowledgedResponse>
this.followIndexNamePattern = followIndexNamePattern;
}
public Settings getSettings() {
return settings;
}
public void setSettings(final Settings settings) {
this.settings = Objects.requireNonNull(settings);
}
public FollowParameters getParameters() {
return parameters;
}
@ -149,8 +162,12 @@ public class PutAutoFollowPatternAction extends ActionType<AcknowledgedResponse>
leaderIndexPatterns = in.readStringList();
followIndexNamePattern = in.readOptionalString();
if (in.getVersion().onOrAfter(Version.V_6_7_0)) {
if (in.getVersion().onOrAfter(Version.V_7_9_0)) {
settings = Settings.readSettingsFromStream(in);
}
parameters = new FollowParameters(in);
} else {
settings = Settings.EMPTY;
parameters = new FollowParameters();
parameters.maxReadRequestOperationCount = in.readOptionalVInt();
parameters.maxReadRequestSize = in.readOptionalWriteable(ByteSizeValue::new);
@ -173,6 +190,9 @@ public class PutAutoFollowPatternAction extends ActionType<AcknowledgedResponse>
out.writeStringCollection(leaderIndexPatterns);
out.writeOptionalString(followIndexNamePattern);
if (out.getVersion().onOrAfter(Version.V_6_7_0)) {
if (out.getVersion().onOrAfter(Version.V_7_9_0)) {
Settings.writeSettingsToStream(settings, out);
}
parameters.writeTo(out);
} else {
out.writeOptionalVInt(parameters.maxReadRequestOperationCount);
@ -197,6 +217,13 @@ public class PutAutoFollowPatternAction extends ActionType<AcknowledgedResponse>
if (followIndexNamePattern != null) {
builder.field(AutoFollowPattern.FOLLOW_PATTERN_FIELD.getPreferredName(), followIndexNamePattern);
}
if (settings.isEmpty() == false) {
builder.startObject(SETTINGS_FIELD.getPreferredName());
{
settings.toXContent(builder, params);
}
builder.endObject();
}
parameters.toXContentFragment(builder);
}
builder.endObject();
@ -226,6 +253,8 @@ public class PutAutoFollowPatternAction extends ActionType<AcknowledgedResponse>
private String remoteCluster;
private List<String> leaderIndexPatterns;
private String followIndexNamePattern;
private Settings settings = Settings.EMPTY;
}
}

View File

@ -17,6 +17,7 @@ import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -40,6 +41,7 @@ public final class PutFollowAction extends ActionType<PutFollowAction.Response>
private static final ParseField REMOTE_CLUSTER_FIELD = new ParseField("remote_cluster");
private static final ParseField LEADER_INDEX_FIELD = new ParseField("leader_index");
private static final ParseField SETTINGS_FIELD = new ParseField("settings");
// Note that Request should be the Value class here for this parser with a 'parameters' field that maps to
// PutFollowParameters class. But since two minor version are already released with duplicate follow parameters
@ -49,6 +51,11 @@ public final class PutFollowAction extends ActionType<PutFollowAction.Response>
static {
PARSER.declareString((putFollowParameters, value) -> putFollowParameters.remoteCluster = value, REMOTE_CLUSTER_FIELD);
PARSER.declareString((putFollowParameters, value) -> putFollowParameters.leaderIndex = value, LEADER_INDEX_FIELD);
PARSER.declareObject(
(putFollowParameters, value) -> putFollowParameters.settings = value,
(p, c) -> Settings.fromXContent(p),
SETTINGS_FIELD
);
FollowParameters.initParser(PARSER);
}
@ -61,12 +68,14 @@ public final class PutFollowAction extends ActionType<PutFollowAction.Response>
request.setFollowerIndex(followerIndex);
request.setRemoteCluster(parameters.remoteCluster);
request.setLeaderIndex(parameters.leaderIndex);
request.setSettings(parameters.settings);
request.setParameters(parameters);
return request;
}
private String remoteCluster;
private String leaderIndex;
private Settings settings = Settings.EMPTY;
private String followerIndex;
private FollowParameters parameters = new FollowParameters();
private ActiveShardCount waitForActiveShards = ActiveShardCount.NONE;
@ -98,6 +107,14 @@ public final class PutFollowAction extends ActionType<PutFollowAction.Response>
this.leaderIndex = leaderIndex;
}
public Settings getSettings() {
return settings;
}
public void setSettings(final Settings settings) {
this.settings = Objects.requireNonNull(settings);
}
public FollowParameters getParameters() {
return parameters;
}
@ -157,6 +174,9 @@ public final class PutFollowAction extends ActionType<PutFollowAction.Response>
this.remoteCluster = in.readString();
this.leaderIndex = in.readString();
this.followerIndex = in.readString();
if (in.getVersion().onOrAfter(Version.V_7_9_0)) {
this.settings = Settings.readSettingsFromStream(in);
}
this.parameters = new FollowParameters(in);
if (in.getVersion().onOrAfter(Version.V_6_7_0)) {
waitForActiveShards(ActiveShardCount.readFrom(in));
@ -169,6 +189,9 @@ public final class PutFollowAction extends ActionType<PutFollowAction.Response>
out.writeString(remoteCluster);
out.writeString(leaderIndex);
out.writeString(followerIndex);
if (out.getVersion().onOrAfter(Version.V_7_9_0)) {
Settings.writeSettingsToStream(settings, out);
}
parameters.writeTo(out);
if (out.getVersion().onOrAfter(Version.V_6_7_0)) {
waitForActiveShards.writeTo(out);
@ -181,6 +204,13 @@ public final class PutFollowAction extends ActionType<PutFollowAction.Response>
{
builder.field(REMOTE_CLUSTER_FIELD.getPreferredName(), remoteCluster);
builder.field(LEADER_INDEX_FIELD.getPreferredName(), leaderIndex);
if (settings.isEmpty() == false) {
builder.startObject(SETTINGS_FIELD.getPreferredName());
{
settings.toXContent(builder, params);
}
builder.endObject();
}
parameters.toXContentFragment(builder);
}
builder.endObject();
@ -209,6 +239,8 @@ public final class PutFollowAction extends ActionType<PutFollowAction.Response>
private String remoteCluster;
private String leaderIndex;
private Settings settings = Settings.EMPTY;
}
}