Enable reading auto-follow patterns from x-content (#40130)
This named writable was never registered, so it means that we could not read auto-follow patterns that were registered in the cluster state. This causes them to be lost on restarts, a bad bug. This commit addresses this by registering this named writable, and we add a basic CCR restart test to ensure that CCR keeps functioning properly when the follower is restarted.
This commit is contained in:
parent
d093205b6a
commit
f88e4181ca
|
@ -0,0 +1,61 @@
|
|||
import org.elasticsearch.gradle.test.RestIntegTestTask
|
||||
|
||||
apply plugin: 'elasticsearch.standalone-test'
|
||||
|
||||
dependencies {
|
||||
testCompile project(':x-pack:plugin:ccr:qa')
|
||||
}
|
||||
|
||||
task leaderClusterTest(type: RestIntegTestTask) {
|
||||
mustRunAfter(precommit)
|
||||
}
|
||||
|
||||
leaderClusterTestCluster {
|
||||
numNodes = 1
|
||||
clusterName = 'leader-cluster'
|
||||
setting 'xpack.license.self_generated.type', 'trial'
|
||||
setting 'node.name', 'leader'
|
||||
}
|
||||
|
||||
leaderClusterTestRunner {
|
||||
systemProperty 'tests.target_cluster', 'leader'
|
||||
}
|
||||
|
||||
task followClusterTest(type: RestIntegTestTask) {}
|
||||
|
||||
followClusterTestCluster {
|
||||
dependsOn leaderClusterTestRunner
|
||||
numNodes = 1
|
||||
clusterName = 'follow-cluster'
|
||||
setting 'xpack.monitoring.collection.enabled', 'true'
|
||||
setting 'xpack.license.self_generated.type', 'trial'
|
||||
setting 'cluster.remote.leader_cluster.seeds', "\"${-> leaderClusterTest.nodes.get(0).transportUri()}\""
|
||||
setting 'node.name', 'follow'
|
||||
}
|
||||
|
||||
followClusterTestRunner {
|
||||
systemProperty 'tests.target_cluster', 'follow'
|
||||
systemProperty 'tests.leader_host', "${-> leaderClusterTest.nodes.get(0).httpUri()}"
|
||||
}
|
||||
|
||||
task followClusterRestartTest(type: RestIntegTestTask) {}
|
||||
|
||||
followClusterRestartTestCluster {
|
||||
dependsOn followClusterTestRunner
|
||||
numNodes = 1
|
||||
clusterName = 'follow-cluster'
|
||||
dataDir = { nodeNumber -> followClusterTest.nodes[0].dataDir }
|
||||
setting 'xpack.monitoring.collection.enabled', 'true'
|
||||
setting 'xpack.license.self_generated.type', 'trial'
|
||||
setting 'cluster.remote.leader_cluster.seeds', "\"${-> leaderClusterTest.nodes.get(0).transportUri()}\""
|
||||
setting 'node.name', 'follow'
|
||||
}
|
||||
|
||||
followClusterRestartTestRunner {
|
||||
systemProperty 'tests.target_cluster', 'follow-restart'
|
||||
systemProperty 'tests.leader_host', "${-> leaderClusterTest.nodes.get(0).httpUri()}"
|
||||
finalizedBy 'leaderClusterTestCluster#stop'
|
||||
}
|
||||
|
||||
check.dependsOn followClusterRestartTest
|
||||
unitTest.enabled = false
|
|
@ -0,0 +1,99 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.xpack.ccr;
|
||||
|
||||
import org.elasticsearch.client.Request;
|
||||
import org.elasticsearch.client.RestClient;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class RestartIT extends ESCCRRestTestCase {
|
||||
|
||||
public void testRestart() throws Exception {
|
||||
final int numberOfDocuments = 128;
|
||||
final String testsTargetCluster = System.getProperty("tests.target_cluster");
|
||||
switch (testsTargetCluster) {
|
||||
case "leader": {
|
||||
// create a single index "leader" on the leader
|
||||
createIndexAndIndexDocuments("leader", numberOfDocuments, client());
|
||||
break;
|
||||
}
|
||||
case "follow": {
|
||||
// follow "leader" with "follow-leader" on the follower
|
||||
followIndex("leader", "follow-leader");
|
||||
verifyFollower("follow-leader", numberOfDocuments, client());
|
||||
|
||||
// now create an auto-follow pattern for "leader-*"
|
||||
final Request putPatternRequest = new Request("PUT", "/_ccr/auto_follow/leader_cluster_pattern");
|
||||
putPatternRequest.setJsonEntity("{" +
|
||||
"\"leader_index_patterns\": [\"leader-*\"]," +
|
||||
"\"remote_cluster\": \"leader_cluster\"," +
|
||||
"\"follow_index_pattern\":\"follow-{{leader_index}}\"}");
|
||||
assertOK(client().performRequest(putPatternRequest));
|
||||
try (RestClient leaderClient = buildLeaderClient()) {
|
||||
// create "leader-1" on the leader, which should be replicated to "follow-leader-1" on the follower
|
||||
createIndexAndIndexDocuments("leader-1", numberOfDocuments, leaderClient);
|
||||
// the follower should catch up
|
||||
verifyFollower("follow-leader-1", numberOfDocuments, client());
|
||||
}
|
||||
break;
|
||||
}
|
||||
case "follow-restart": {
|
||||
try (RestClient leaderClient = buildLeaderClient()) {
|
||||
// create "leader-2" on the leader, and index some additional documents into existing indices
|
||||
createIndexAndIndexDocuments("leader-2", numberOfDocuments, leaderClient);
|
||||
for (final String index : new String[]{"leader", "leader-1", "leader-2"}) {
|
||||
indexDocuments(index, numberOfDocuments, numberOfDocuments, leaderClient);
|
||||
}
|
||||
// the followers should catch up
|
||||
for (final String index : new String[]{"follow-leader", "follow-leader-1", "follow-leader-2"}) {
|
||||
logger.info("verifying {} using {}", index, client().getNodes());
|
||||
verifyFollower(index, 2 * numberOfDocuments, client());
|
||||
}
|
||||
// one more index "leader-3" on the follower
|
||||
createIndexAndIndexDocuments("leader-3", 2 * numberOfDocuments, leaderClient);
|
||||
// the follower should catch up
|
||||
verifyFollower("follow-leader-3", 2 * numberOfDocuments, client());
|
||||
}
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
throw new IllegalArgumentException("unexpected value [" + testsTargetCluster + "] for tests.target_cluster");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void createIndexAndIndexDocuments(final String index, final int numberOfDocuments, final RestClient client) throws IOException {
|
||||
final Settings settings = Settings.builder().put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true).build();
|
||||
final Request createIndexRequest = new Request("PUT", "/" + index);
|
||||
createIndexRequest.setJsonEntity("{\"settings\":" + Strings.toString(settings) + "}");
|
||||
assertOK(client.performRequest(createIndexRequest));
|
||||
indexDocuments(index, numberOfDocuments, 0, client);
|
||||
}
|
||||
|
||||
private void indexDocuments(
|
||||
final String index,
|
||||
final int numberOfDocuments,
|
||||
final int initial,
|
||||
final RestClient client) throws IOException {
|
||||
for (int i = 0, j = initial; i < numberOfDocuments; i++, j++) {
|
||||
index(client, index, Integer.toString(j), "field", j);
|
||||
}
|
||||
assertOK(client.performRequest(new Request("POST", "/" + index + "/_refresh")));
|
||||
}
|
||||
|
||||
private void verifyFollower(final String index, final int numberOfDocuments, final RestClient client) throws Exception {
|
||||
assertBusy(() -> {
|
||||
ensureYellow(index, client);
|
||||
verifyDocuments(index, numberOfDocuments, "*:*", client);
|
||||
});
|
||||
}
|
||||
|
||||
}
|
|
@ -120,13 +120,13 @@ public class ESCCRRestTestCase extends ESRestTestCase {
|
|||
Map<String, ?> response = toMap(client.performRequest(request));
|
||||
|
||||
int numDocs = (int) XContentMapValues.extractValue("hits.total", response);
|
||||
assertThat(numDocs, equalTo(expectedNumDocs));
|
||||
assertThat(index, numDocs, equalTo(expectedNumDocs));
|
||||
|
||||
List<?> hits = (List<?>) XContentMapValues.extractValue("hits.hits", response);
|
||||
assertThat(hits.size(), equalTo(expectedNumDocs));
|
||||
for (int i = 0; i < expectedNumDocs; i++) {
|
||||
int value = (int) XContentMapValues.extractValue("_source.field", (Map<?, ?>) hits.get(i));
|
||||
assertThat(i, equalTo(value));
|
||||
assertThat(index, i, equalTo(value));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -205,15 +205,19 @@ public class ESCCRRestTestCase extends ESRestTestCase {
|
|||
return XContentHelper.convertToMap(JsonXContent.jsonXContent, response, false);
|
||||
}
|
||||
|
||||
protected static void ensureYellow(String index) throws IOException {
|
||||
Request request = new Request("GET", "/_cluster/health/" + index);
|
||||
protected static void ensureYellow(final String index) throws IOException {
|
||||
ensureYellow(index, adminClient());
|
||||
}
|
||||
|
||||
protected static void ensureYellow(final String index, final RestClient client) throws IOException {
|
||||
final Request request = new Request("GET", "/_cluster/health/" + index);
|
||||
request.addParameter("wait_for_status", "yellow");
|
||||
request.addParameter("wait_for_active_shards", "1");
|
||||
request.addParameter("wait_for_no_relocating_shards", "true");
|
||||
request.addParameter("wait_for_no_initializing_shards", "true");
|
||||
request.addParameter("timeout", "70s");
|
||||
request.addParameter("timeout", "5s");
|
||||
request.addParameter("level", "shards");
|
||||
adminClient().performRequest(request);
|
||||
client.performRequest(request);
|
||||
}
|
||||
|
||||
protected int countCcrNodeTasks() throws IOException {
|
||||
|
|
|
@ -12,6 +12,7 @@ import org.elasticsearch.action.ActionResponse;
|
|||
import org.elasticsearch.action.admin.indices.mapping.put.MappingRequestValidator;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
|
@ -84,6 +85,7 @@ import org.elasticsearch.xpack.ccr.rest.RestPutFollowAction;
|
|||
import org.elasticsearch.xpack.ccr.rest.RestResumeFollowAction;
|
||||
import org.elasticsearch.xpack.ccr.rest.RestUnfollowAction;
|
||||
import org.elasticsearch.xpack.core.XPackPlugin;
|
||||
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
|
||||
import org.elasticsearch.xpack.core.ccr.CCRFeatureSet;
|
||||
import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
|
||||
import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction;
|
||||
|
@ -275,11 +277,17 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E
|
|||
|
||||
public List<NamedXContentRegistry.Entry> getNamedXContent() {
|
||||
return Arrays.asList(
|
||||
// Persistent action requests
|
||||
new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(ShardFollowTask.NAME),
|
||||
// auto-follow metadata, persisted into the cluster state as XContent
|
||||
new NamedXContentRegistry.Entry(
|
||||
MetaData.Custom.class,
|
||||
new ParseField(AutoFollowMetadata.TYPE),
|
||||
AutoFollowMetadata::fromXContent),
|
||||
// persistent action requests
|
||||
new NamedXContentRegistry.Entry(
|
||||
PersistentTaskParams.class,
|
||||
new ParseField(ShardFollowTask.NAME),
|
||||
ShardFollowTask::fromXContent),
|
||||
|
||||
// Task statuses
|
||||
// task statuses
|
||||
new NamedXContentRegistry.Entry(
|
||||
ShardFollowNodeTaskStatus.class,
|
||||
new ParseField(ShardFollowNodeTaskStatus.STATUS_PARSER_NAME),
|
||||
|
|
Loading…
Reference in New Issue