[CCR] Validate remote cluster license as part of put auto follow pattern api call (#35364)
Validate remote cluster license as part of put auto follow pattern api call in addition of validation that when auto follow coordinator starts auto following indices in the leader cluster. Also added qa module that tests what happens to ccr after downgrading to basic license. Existing active follow indices should remain to follow, but the auto follow feature should not pickup new leader indices.
This commit is contained in:
parent
f2d7c94949
commit
ae2af20ae5
|
@ -0,0 +1,62 @@
|
|||
import org.elasticsearch.gradle.test.RestIntegTestTask
|
||||
|
||||
apply plugin: 'elasticsearch.standalone-test'
|
||||
|
||||
dependencies {
|
||||
testCompile project(path: xpackModule('core'), configuration: 'testArtifacts')
|
||||
testCompile project(path: xpackModule('ccr'), configuration: 'runtime')
|
||||
testCompile project(':x-pack:plugin:ccr:qa')
|
||||
}
|
||||
|
||||
task leaderClusterTest(type: RestIntegTestTask) {
|
||||
mustRunAfter(precommit)
|
||||
}
|
||||
|
||||
leaderClusterTestCluster {
|
||||
numNodes = 1
|
||||
clusterName = 'leader-cluster'
|
||||
setting 'xpack.license.self_generated.type', 'trial'
|
||||
}
|
||||
|
||||
leaderClusterTestRunner {
|
||||
systemProperty 'tests.target_cluster', 'leader'
|
||||
}
|
||||
|
||||
task writeJavaPolicy {
|
||||
doLast {
|
||||
final File tmp = file("${buildDir}/tmp")
|
||||
if (tmp.exists() == false && tmp.mkdirs() == false) {
|
||||
throw new GradleException("failed to create temporary directory [${tmp}]")
|
||||
}
|
||||
final File javaPolicy = file("${tmp}/java.policy")
|
||||
javaPolicy.write(
|
||||
[
|
||||
"grant {",
|
||||
" permission java.io.FilePermission \"${-> followClusterTest.getNodes().get(0).homeDir}/logs/${-> followClusterTest.getNodes().get(0).clusterName}.log\", \"read\";",
|
||||
"};"
|
||||
].join("\n"))
|
||||
}
|
||||
}
|
||||
|
||||
task followClusterTest(type: RestIntegTestTask) {}
|
||||
followClusterTest.dependsOn writeJavaPolicy
|
||||
|
||||
followClusterTestCluster {
|
||||
dependsOn leaderClusterTestRunner
|
||||
numNodes = 1
|
||||
clusterName = 'follow-cluster'
|
||||
setting 'xpack.monitoring.collection.enabled', 'true'
|
||||
setting 'xpack.license.self_generated.type', 'trial'
|
||||
setting 'cluster.remote.leader_cluster.seeds', "\"${-> leaderClusterTest.nodes.get(0).transportUri()}\""
|
||||
}
|
||||
|
||||
followClusterTestRunner {
|
||||
systemProperty 'java.security.policy', "file://${buildDir}/tmp/java.policy"
|
||||
systemProperty 'tests.target_cluster', 'follow'
|
||||
systemProperty 'tests.leader_host', "${-> leaderClusterTest.nodes.get(0).httpUri()}"
|
||||
systemProperty 'log', "${-> followClusterTest.getNodes().get(0).homeDir}/logs/${-> followClusterTest.getNodes().get(0).clusterName}.log"
|
||||
finalizedBy 'leaderClusterTestCluster#stop'
|
||||
}
|
||||
|
||||
check.dependsOn followClusterTest
|
||||
test.enabled = false // no unit tests for multi-cluster-search, only the rest integration test
|
|
@ -0,0 +1,124 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.ccr;
|
||||
|
||||
import org.apache.lucene.util.Constants;
|
||||
import org.elasticsearch.client.Request;
|
||||
import org.elasticsearch.client.ResponseException;
|
||||
import org.elasticsearch.client.RestClient;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.io.PathUtils;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.elasticsearch.common.xcontent.ObjectPath.eval;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||
import static org.hamcrest.core.Is.is;
|
||||
|
||||
public class FollowIndexIT extends ESCCRRestTestCase {
|
||||
|
||||
public void testDowngradeRemoteClusterToBasic() throws Exception {
|
||||
assumeFalse("windows is the worst", Constants.WINDOWS);
|
||||
if ("follow".equals(targetCluster) == false) {
|
||||
return;
|
||||
}
|
||||
|
||||
{
|
||||
Request request = new Request("PUT", "/_ccr/auto_follow/test_pattern");
|
||||
request.setJsonEntity("{\"leader_index_patterns\": [\"logs-*\"], \"remote_cluster\": \"leader_cluster\"}");
|
||||
assertOK(client().performRequest(request));
|
||||
}
|
||||
|
||||
String index1 = "logs-20190101";
|
||||
try (RestClient leaderClient = buildLeaderClient()) {
|
||||
createNewIndexAndIndexDocs(leaderClient, index1);
|
||||
}
|
||||
|
||||
assertBusy(() -> {
|
||||
ensureYellow(index1);
|
||||
verifyDocuments(index1, 5, "filtered_field:true");
|
||||
});
|
||||
|
||||
String index2 = "logs-20190102";
|
||||
try (RestClient leaderClient = buildLeaderClient()) {
|
||||
Request request = new Request("POST", "/_xpack/license/start_basic");
|
||||
request.addParameter("acknowledge", "true");
|
||||
Map<?, ?> response = toMap(leaderClient.performRequest(request));
|
||||
assertThat(response.get("basic_was_started"), is(true));
|
||||
assertThat(response.get("acknowledged"), is(true));
|
||||
|
||||
createNewIndexAndIndexDocs(leaderClient, index2);
|
||||
index(leaderClient, index1, "5", "field", 5, "filtered_field", "true");
|
||||
}
|
||||
|
||||
assertBusy(() -> {
|
||||
Request statsRequest = new Request("GET", "/_ccr/stats");
|
||||
Map<?, ?> response = toMap(client().performRequest(statsRequest));
|
||||
assertThat(eval("auto_follow_stats.number_of_successful_follow_indices", response), equalTo(1));
|
||||
assertThat(eval("auto_follow_stats.number_of_failed_remote_cluster_state_requests", response), greaterThanOrEqualTo(1));
|
||||
assertThat(eval("auto_follow_stats.recent_auto_follow_errors.0.auto_follow_exception.reason", response),
|
||||
containsString("the license mode [BASIC] on cluster [leader_cluster] does not enable [ccr]"));
|
||||
|
||||
// Follow indices actively following leader indices before the downgrade to basic license remain to follow
|
||||
// the leader index after the downgrade, so document with id 5 should be replicated to follower index:
|
||||
verifyDocuments(index1, 6, "filtered_field:true");
|
||||
|
||||
// Index2 was created in leader cluster after the downgrade and therefor the auto follow coordinator in
|
||||
// follow cluster should not pick that index up:
|
||||
assertThat(indexExists(index2), is(false));
|
||||
|
||||
// parse the logs and ensure that the auto-coordinator skipped coordination on the leader cluster
|
||||
assertBusy(() -> {
|
||||
final List<String> lines = Files.readAllLines(PathUtils.get(System.getProperty("log")));
|
||||
final Iterator<String> it = lines.iterator();
|
||||
boolean warn = false;
|
||||
while (it.hasNext()) {
|
||||
final String line = it.next();
|
||||
if (line.matches(".*\\[WARN\\s*\\]\\[o\\.e\\.x\\.c\\.a\\.AutoFollowCoordinator\\s*\\] \\[node-0\\] " +
|
||||
"failure occurred while fetching cluster state for auto follow pattern \\[test_pattern\\]")) {
|
||||
warn = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
assertTrue(warn);
|
||||
assertTrue(it.hasNext());
|
||||
final String lineAfterWarn = it.next();
|
||||
assertThat(
|
||||
lineAfterWarn,
|
||||
equalTo("org.elasticsearch.ElasticsearchStatusException: " +
|
||||
"can not fetch remote cluster state as the remote cluster [leader_cluster] is not licensed for [ccr]; " +
|
||||
"the license mode [BASIC] on cluster [leader_cluster] does not enable [ccr]"));
|
||||
});
|
||||
});
|
||||
|
||||
// Manually following index2 also does not work after the downgrade:
|
||||
Exception e = expectThrows(ResponseException.class, () -> followIndex("leader_cluster", index2));
|
||||
assertThat(e.getMessage(), containsString("the license mode [BASIC] on cluster [leader_cluster] does not enable [ccr]"));
|
||||
}
|
||||
|
||||
private void createNewIndexAndIndexDocs(RestClient client, String index) throws IOException {
|
||||
Settings settings = Settings.builder()
|
||||
.put("index.soft_deletes.enabled", true)
|
||||
.build();
|
||||
Request request = new Request("PUT", "/" + index);
|
||||
request.setJsonEntity("{\"settings\": " + Strings.toString(settings) +
|
||||
", \"mappings\": {\"_doc\": {\"properties\": {\"field\": {\"type\": \"keyword\"}}}} }");
|
||||
assertOK(client.performRequest(request));
|
||||
|
||||
for (int i = 0; i < 5; i++) {
|
||||
String id = Integer.toString(i);
|
||||
index(client, index, id, "field", i, "filtered_field", "true");
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -21,24 +21,7 @@ leaderClusterTestRunner {
|
|||
systemProperty 'tests.target_cluster', 'leader'
|
||||
}
|
||||
|
||||
task writeJavaPolicy {
|
||||
doLast {
|
||||
final File tmp = file("${buildDir}/tmp")
|
||||
if (tmp.exists() == false && tmp.mkdirs() == false) {
|
||||
throw new GradleException("failed to create temporary directory [${tmp}]")
|
||||
}
|
||||
final File javaPolicy = file("${tmp}/java.policy")
|
||||
javaPolicy.write(
|
||||
[
|
||||
"grant {",
|
||||
" permission java.io.FilePermission \"${-> followClusterTest.getNodes().get(0).homeDir}/logs/${-> followClusterTest.getNodes().get(0).clusterName}.log\", \"read\";",
|
||||
"};"
|
||||
].join("\n"))
|
||||
}
|
||||
}
|
||||
|
||||
task followClusterTest(type: RestIntegTestTask) {}
|
||||
followClusterTest.dependsOn writeJavaPolicy
|
||||
|
||||
followClusterTestCluster {
|
||||
dependsOn leaderClusterTestRunner
|
||||
|
@ -49,10 +32,8 @@ followClusterTestCluster {
|
|||
}
|
||||
|
||||
followClusterTestRunner {
|
||||
systemProperty 'java.security.policy', "file://${buildDir}/tmp/java.policy"
|
||||
systemProperty 'tests.target_cluster', 'follow'
|
||||
systemProperty 'tests.leader_host', "${-> leaderClusterTest.nodes.get(0).httpUri()}"
|
||||
systemProperty 'log', "${-> followClusterTest.getNodes().get(0).homeDir}/logs/${-> followClusterTest.getNodes().get(0).clusterName}.log"
|
||||
finalizedBy 'leaderClusterTestCluster#stop'
|
||||
}
|
||||
|
||||
|
|
|
@ -6,18 +6,12 @@
|
|||
|
||||
package org.elasticsearch.xpack.ccr;
|
||||
|
||||
import org.apache.lucene.util.Constants;
|
||||
import org.elasticsearch.client.Request;
|
||||
import org.elasticsearch.client.ResponseException;
|
||||
import org.elasticsearch.common.io.PathUtils;
|
||||
|
||||
import java.nio.file.Files;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.hasToString;
|
||||
|
||||
public class CcrMultiClusterLicenseIT extends ESCCRRestTestCase {
|
||||
|
@ -31,36 +25,17 @@ public class CcrMultiClusterLicenseIT extends ESCCRRestTestCase {
|
|||
}
|
||||
|
||||
public void testAutoFollow() throws Exception {
|
||||
assumeFalse("windows is the worst", Constants.WINDOWS);
|
||||
if ("follow".equals(targetCluster)) {
|
||||
final Request request = new Request("PUT", "/_ccr/auto_follow/test_pattern");
|
||||
request.setJsonEntity("{\"leader_index_patterns\":[\"*\"], \"remote_cluster\": \"leader_cluster\"}");
|
||||
client().performRequest(request);
|
||||
|
||||
// parse the logs and ensure that the auto-coordinator skipped coordination on the leader cluster
|
||||
assertBusy(() -> {
|
||||
final List<String> lines = Files.readAllLines(PathUtils.get(System.getProperty("log")));
|
||||
|
||||
final Iterator<String> it = lines.iterator();
|
||||
|
||||
boolean warn = false;
|
||||
while (it.hasNext()) {
|
||||
final String line = it.next();
|
||||
if (line.matches(".*\\[WARN\\s*\\]\\[o\\.e\\.x\\.c\\.a\\.AutoFollowCoordinator\\s*\\] \\[node-0\\] " +
|
||||
"failure occurred while fetching cluster state for auto follow pattern \\[test_pattern\\]")) {
|
||||
warn = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
assertTrue(warn);
|
||||
assertTrue(it.hasNext());
|
||||
final String lineAfterWarn = it.next();
|
||||
assertThat(
|
||||
lineAfterWarn,
|
||||
equalTo("org.elasticsearch.ElasticsearchStatusException: " +
|
||||
"can not fetch remote cluster state as the remote cluster [leader_cluster] is not licensed for [ccr]; " +
|
||||
"the license mode [BASIC] on cluster [leader_cluster] does not enable [ccr]"));
|
||||
});
|
||||
final ResponseException e = expectThrows(ResponseException.class, () -> client().performRequest(request));
|
||||
final String expected = String.format(
|
||||
Locale.ROOT,
|
||||
"can not fetch remote cluster state as the remote cluster [%s] is not licensed for [ccr]; " +
|
||||
"the license mode [BASIC] on cluster [%s] does not enable [ccr]",
|
||||
"leader_cluster",
|
||||
"leader_cluster");
|
||||
assertThat(e, hasToString(containsString(expected)));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -33,6 +33,7 @@ import java.util.HashMap;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class TransportPutAutoFollowPatternAction extends
|
||||
|
@ -75,47 +76,46 @@ public class TransportPutAutoFollowPatternAction extends
|
|||
return;
|
||||
}
|
||||
final Client remoteClient = client.getRemoteClusterClient(request.getRemoteCluster());
|
||||
final Map<String, String> filteredHeaders = threadPool.getThreadContext().getHeaders().entrySet().stream()
|
||||
.filter(e -> ShardFollowTask.HEADER_FILTERS.contains(e.getKey()))
|
||||
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
|
||||
|
||||
Consumer<ClusterState> consumer = remoteClusterState -> {
|
||||
String[] indices = request.getLeaderIndexPatterns().toArray(new String[0]);
|
||||
ccrLicenseChecker.hasPrivilegesToFollowIndices(remoteClient, indices, e -> {
|
||||
if (e == null) {
|
||||
clusterService.submitStateUpdateTask("put-auto-follow-pattern-" + request.getRemoteCluster(),
|
||||
new AckedClusterStateUpdateTask<AcknowledgedResponse>(request, listener) {
|
||||
|
||||
@Override
|
||||
protected AcknowledgedResponse newResponse(boolean acknowledged) {
|
||||
return new AcknowledgedResponse(acknowledged);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) throws Exception {
|
||||
return innerPut(request, filteredHeaders, currentState, remoteClusterState);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
});
|
||||
};
|
||||
|
||||
final ClusterStateRequest clusterStateRequest = new ClusterStateRequest();
|
||||
clusterStateRequest.clear();
|
||||
clusterStateRequest.metaData(true);
|
||||
|
||||
Map<String, String> filteredHeaders = threadPool.getThreadContext().getHeaders().entrySet().stream()
|
||||
.filter(e -> ShardFollowTask.HEADER_FILTERS.contains(e.getKey()))
|
||||
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
|
||||
ccrLicenseChecker.checkRemoteClusterLicenseAndFetchClusterState(client, request.getRemoteCluster(),
|
||||
clusterStateRequest, listener::onFailure, consumer);
|
||||
|
||||
String[] indices = request.getLeaderIndexPatterns().toArray(new String[0]);
|
||||
ccrLicenseChecker.hasPrivilegesToFollowIndices(remoteClient, indices, e -> {
|
||||
if (e == null) {
|
||||
remoteClient.admin().cluster().state(
|
||||
clusterStateRequest,
|
||||
ActionListener.wrap(
|
||||
clusterStateResponse -> {
|
||||
final ClusterState leaderClusterState = clusterStateResponse.getState();
|
||||
clusterService.submitStateUpdateTask("put-auto-follow-pattern-" + request.getRemoteCluster(),
|
||||
new AckedClusterStateUpdateTask<AcknowledgedResponse>(request, listener) {
|
||||
|
||||
@Override
|
||||
protected AcknowledgedResponse newResponse(boolean acknowledged) {
|
||||
return new AcknowledgedResponse(acknowledged);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) throws Exception {
|
||||
return innerPut(request, filteredHeaders, currentState, leaderClusterState);
|
||||
}
|
||||
});
|
||||
},
|
||||
listener::onFailure));
|
||||
} else {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
static ClusterState innerPut(PutAutoFollowPatternAction.Request request,
|
||||
Map<String, String> filteredHeaders,
|
||||
ClusterState localState,
|
||||
ClusterState leaderClusterState) {
|
||||
ClusterState remoteClusterState) {
|
||||
// auto patterns are always overwritten
|
||||
// only already followed index uuids are updated
|
||||
|
||||
|
@ -143,10 +143,10 @@ public class TransportPutAutoFollowPatternAction extends
|
|||
followedLeaderIndices.put(request.getName(), followedIndexUUIDs);
|
||||
// Mark existing leader indices as already auto followed:
|
||||
if (previousPattern != null) {
|
||||
markExistingIndicesAsAutoFollowedForNewPatterns(request.getLeaderIndexPatterns(), leaderClusterState.metaData(),
|
||||
markExistingIndicesAsAutoFollowedForNewPatterns(request.getLeaderIndexPatterns(), remoteClusterState.metaData(),
|
||||
previousPattern, followedIndexUUIDs);
|
||||
} else {
|
||||
markExistingIndicesAsAutoFollowed(request.getLeaderIndexPatterns(), leaderClusterState.metaData(),
|
||||
markExistingIndicesAsAutoFollowed(request.getLeaderIndexPatterns(), remoteClusterState.metaData(),
|
||||
followedIndexUUIDs);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue