diff --git a/x-pack/plugin/ccr/qa/multi-cluster-downgraded-to-basic-license/build.gradle b/x-pack/plugin/ccr/qa/multi-cluster-downgraded-to-basic-license/build.gradle new file mode 100644 index 00000000000..341b5cda6c9 --- /dev/null +++ b/x-pack/plugin/ccr/qa/multi-cluster-downgraded-to-basic-license/build.gradle @@ -0,0 +1,62 @@ +import org.elasticsearch.gradle.test.RestIntegTestTask + +apply plugin: 'elasticsearch.standalone-test' + +dependencies { + testCompile project(path: xpackModule('core'), configuration: 'testArtifacts') + testCompile project(path: xpackModule('ccr'), configuration: 'runtime') + testCompile project(':x-pack:plugin:ccr:qa') +} + +task leaderClusterTest(type: RestIntegTestTask) { + mustRunAfter(precommit) +} + +leaderClusterTestCluster { + numNodes = 1 + clusterName = 'leader-cluster' + setting 'xpack.license.self_generated.type', 'trial' +} + +leaderClusterTestRunner { + systemProperty 'tests.target_cluster', 'leader' +} + +task writeJavaPolicy { + doLast { + final File tmp = file("${buildDir}/tmp") + if (tmp.exists() == false && tmp.mkdirs() == false) { + throw new GradleException("failed to create temporary directory [${tmp}]") + } + final File javaPolicy = file("${tmp}/java.policy") + javaPolicy.write( + [ + "grant {", + " permission java.io.FilePermission \"${-> followClusterTest.getNodes().get(0).homeDir}/logs/${-> followClusterTest.getNodes().get(0).clusterName}.log\", \"read\";", + "};" + ].join("\n")) + } +} + +task followClusterTest(type: RestIntegTestTask) {} +followClusterTest.dependsOn writeJavaPolicy + +followClusterTestCluster { + dependsOn leaderClusterTestRunner + numNodes = 1 + clusterName = 'follow-cluster' + setting 'xpack.monitoring.collection.enabled', 'true' + setting 'xpack.license.self_generated.type', 'trial' + setting 'cluster.remote.leader_cluster.seeds', "\"${-> leaderClusterTest.nodes.get(0).transportUri()}\"" +} + +followClusterTestRunner { + systemProperty 'java.security.policy', "file://${buildDir}/tmp/java.policy" + systemProperty 'tests.target_cluster', 'follow' + systemProperty 'tests.leader_host', "${-> leaderClusterTest.nodes.get(0).httpUri()}" + systemProperty 'log', "${-> followClusterTest.getNodes().get(0).homeDir}/logs/${-> followClusterTest.getNodes().get(0).clusterName}.log" + finalizedBy 'leaderClusterTestCluster#stop' +} + +check.dependsOn followClusterTest +test.enabled = false // no unit tests for multi-cluster-search, only the rest integration test diff --git a/x-pack/plugin/ccr/qa/multi-cluster-downgraded-to-basic-license/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java b/x-pack/plugin/ccr/qa/multi-cluster-downgraded-to-basic-license/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java new file mode 100644 index 00000000000..c40571fe1d2 --- /dev/null +++ b/x-pack/plugin/ccr/qa/multi-cluster-downgraded-to-basic-license/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java @@ -0,0 +1,124 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr; + +import org.apache.lucene.util.Constants; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.ResponseException; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.PathUtils; +import org.elasticsearch.common.settings.Settings; + +import java.io.IOException; +import java.nio.file.Files; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import static org.elasticsearch.common.xcontent.ObjectPath.eval; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.core.Is.is; + +public class FollowIndexIT extends ESCCRRestTestCase { + + public void testDowngradeRemoteClusterToBasic() throws Exception { + assumeFalse("windows is the worst", Constants.WINDOWS); + if ("follow".equals(targetCluster) == false) { + return; + } + + { + Request request = new Request("PUT", "/_ccr/auto_follow/test_pattern"); + request.setJsonEntity("{\"leader_index_patterns\": [\"logs-*\"], \"remote_cluster\": \"leader_cluster\"}"); + assertOK(client().performRequest(request)); + } + + String index1 = "logs-20190101"; + try (RestClient leaderClient = buildLeaderClient()) { + createNewIndexAndIndexDocs(leaderClient, index1); + } + + assertBusy(() -> { + ensureYellow(index1); + verifyDocuments(index1, 5, "filtered_field:true"); + }); + + String index2 = "logs-20190102"; + try (RestClient leaderClient = buildLeaderClient()) { + Request request = new Request("POST", "/_xpack/license/start_basic"); + request.addParameter("acknowledge", "true"); + Map response = toMap(leaderClient.performRequest(request)); + assertThat(response.get("basic_was_started"), is(true)); + assertThat(response.get("acknowledged"), is(true)); + + createNewIndexAndIndexDocs(leaderClient, index2); + index(leaderClient, index1, "5", "field", 5, "filtered_field", "true"); + } + + assertBusy(() -> { + Request statsRequest = new Request("GET", "/_ccr/stats"); + Map response = toMap(client().performRequest(statsRequest)); + assertThat(eval("auto_follow_stats.number_of_successful_follow_indices", response), equalTo(1)); + assertThat(eval("auto_follow_stats.number_of_failed_remote_cluster_state_requests", response), greaterThanOrEqualTo(1)); + assertThat(eval("auto_follow_stats.recent_auto_follow_errors.0.auto_follow_exception.reason", response), + containsString("the license mode [BASIC] on cluster [leader_cluster] does not enable [ccr]")); + + // Follow indices actively following leader indices before the downgrade to basic license remain to follow + // the leader index after the downgrade, so document with id 5 should be replicated to follower index: + verifyDocuments(index1, 6, "filtered_field:true"); + + // Index2 was created in leader cluster after the downgrade and therefor the auto follow coordinator in + // follow cluster should not pick that index up: + assertThat(indexExists(index2), is(false)); + + // parse the logs and ensure that the auto-coordinator skipped coordination on the leader cluster + assertBusy(() -> { + final List lines = Files.readAllLines(PathUtils.get(System.getProperty("log"))); + final Iterator it = lines.iterator(); + boolean warn = false; + while (it.hasNext()) { + final String line = it.next(); + if (line.matches(".*\\[WARN\\s*\\]\\[o\\.e\\.x\\.c\\.a\\.AutoFollowCoordinator\\s*\\] \\[node-0\\] " + + "failure occurred while fetching cluster state for auto follow pattern \\[test_pattern\\]")) { + warn = true; + break; + } + } + assertTrue(warn); + assertTrue(it.hasNext()); + final String lineAfterWarn = it.next(); + assertThat( + lineAfterWarn, + equalTo("org.elasticsearch.ElasticsearchStatusException: " + + "can not fetch remote cluster state as the remote cluster [leader_cluster] is not licensed for [ccr]; " + + "the license mode [BASIC] on cluster [leader_cluster] does not enable [ccr]")); + }); + }); + + // Manually following index2 also does not work after the downgrade: + Exception e = expectThrows(ResponseException.class, () -> followIndex("leader_cluster", index2)); + assertThat(e.getMessage(), containsString("the license mode [BASIC] on cluster [leader_cluster] does not enable [ccr]")); + } + + private void createNewIndexAndIndexDocs(RestClient client, String index) throws IOException { + Settings settings = Settings.builder() + .put("index.soft_deletes.enabled", true) + .build(); + Request request = new Request("PUT", "/" + index); + request.setJsonEntity("{\"settings\": " + Strings.toString(settings) + + ", \"mappings\": {\"_doc\": {\"properties\": {\"field\": {\"type\": \"keyword\"}}}} }"); + assertOK(client.performRequest(request)); + + for (int i = 0; i < 5; i++) { + String id = Integer.toString(i); + index(client, index, id, "field", i, "filtered_field", "true"); + } + } + +} diff --git a/x-pack/plugin/ccr/qa/multi-cluster-with-non-compliant-license/build.gradle b/x-pack/plugin/ccr/qa/multi-cluster-with-non-compliant-license/build.gradle index 7f1dd2c3211..f3ac0ffe151 100644 --- a/x-pack/plugin/ccr/qa/multi-cluster-with-non-compliant-license/build.gradle +++ b/x-pack/plugin/ccr/qa/multi-cluster-with-non-compliant-license/build.gradle @@ -21,24 +21,7 @@ leaderClusterTestRunner { systemProperty 'tests.target_cluster', 'leader' } -task writeJavaPolicy { - doLast { - final File tmp = file("${buildDir}/tmp") - if (tmp.exists() == false && tmp.mkdirs() == false) { - throw new GradleException("failed to create temporary directory [${tmp}]") - } - final File javaPolicy = file("${tmp}/java.policy") - javaPolicy.write( - [ - "grant {", - " permission java.io.FilePermission \"${-> followClusterTest.getNodes().get(0).homeDir}/logs/${-> followClusterTest.getNodes().get(0).clusterName}.log\", \"read\";", - "};" - ].join("\n")) - } -} - task followClusterTest(type: RestIntegTestTask) {} -followClusterTest.dependsOn writeJavaPolicy followClusterTestCluster { dependsOn leaderClusterTestRunner @@ -49,10 +32,8 @@ followClusterTestCluster { } followClusterTestRunner { - systemProperty 'java.security.policy', "file://${buildDir}/tmp/java.policy" systemProperty 'tests.target_cluster', 'follow' systemProperty 'tests.leader_host', "${-> leaderClusterTest.nodes.get(0).httpUri()}" - systemProperty 'log', "${-> followClusterTest.getNodes().get(0).homeDir}/logs/${-> followClusterTest.getNodes().get(0).clusterName}.log" finalizedBy 'leaderClusterTestCluster#stop' } diff --git a/x-pack/plugin/ccr/qa/multi-cluster-with-non-compliant-license/src/test/java/org/elasticsearch/xpack/ccr/CcrMultiClusterLicenseIT.java b/x-pack/plugin/ccr/qa/multi-cluster-with-non-compliant-license/src/test/java/org/elasticsearch/xpack/ccr/CcrMultiClusterLicenseIT.java index 074701c7313..fc8de772b25 100644 --- a/x-pack/plugin/ccr/qa/multi-cluster-with-non-compliant-license/src/test/java/org/elasticsearch/xpack/ccr/CcrMultiClusterLicenseIT.java +++ b/x-pack/plugin/ccr/qa/multi-cluster-with-non-compliant-license/src/test/java/org/elasticsearch/xpack/ccr/CcrMultiClusterLicenseIT.java @@ -6,18 +6,12 @@ package org.elasticsearch.xpack.ccr; -import org.apache.lucene.util.Constants; import org.elasticsearch.client.Request; import org.elasticsearch.client.ResponseException; -import org.elasticsearch.common.io.PathUtils; -import java.nio.file.Files; -import java.util.Iterator; -import java.util.List; import java.util.Locale; import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasToString; public class CcrMultiClusterLicenseIT extends ESCCRRestTestCase { @@ -31,36 +25,17 @@ public class CcrMultiClusterLicenseIT extends ESCCRRestTestCase { } public void testAutoFollow() throws Exception { - assumeFalse("windows is the worst", Constants.WINDOWS); if ("follow".equals(targetCluster)) { final Request request = new Request("PUT", "/_ccr/auto_follow/test_pattern"); request.setJsonEntity("{\"leader_index_patterns\":[\"*\"], \"remote_cluster\": \"leader_cluster\"}"); - client().performRequest(request); - - // parse the logs and ensure that the auto-coordinator skipped coordination on the leader cluster - assertBusy(() -> { - final List lines = Files.readAllLines(PathUtils.get(System.getProperty("log"))); - - final Iterator it = lines.iterator(); - - boolean warn = false; - while (it.hasNext()) { - final String line = it.next(); - if (line.matches(".*\\[WARN\\s*\\]\\[o\\.e\\.x\\.c\\.a\\.AutoFollowCoordinator\\s*\\] \\[node-0\\] " + - "failure occurred while fetching cluster state for auto follow pattern \\[test_pattern\\]")) { - warn = true; - break; - } - } - assertTrue(warn); - assertTrue(it.hasNext()); - final String lineAfterWarn = it.next(); - assertThat( - lineAfterWarn, - equalTo("org.elasticsearch.ElasticsearchStatusException: " + - "can not fetch remote cluster state as the remote cluster [leader_cluster] is not licensed for [ccr]; " + - "the license mode [BASIC] on cluster [leader_cluster] does not enable [ccr]")); - }); + final ResponseException e = expectThrows(ResponseException.class, () -> client().performRequest(request)); + final String expected = String.format( + Locale.ROOT, + "can not fetch remote cluster state as the remote cluster [%s] is not licensed for [ccr]; " + + "the license mode [BASIC] on cluster [%s] does not enable [ccr]", + "leader_cluster", + "leader_cluster"); + assertThat(e, hasToString(containsString(expected))); } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java index 3d8439fb536..a8194fc1f0f 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java @@ -33,6 +33,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.function.Consumer; import java.util.stream.Collectors; public class TransportPutAutoFollowPatternAction extends @@ -75,47 +76,46 @@ public class TransportPutAutoFollowPatternAction extends return; } final Client remoteClient = client.getRemoteClusterClient(request.getRemoteCluster()); + final Map filteredHeaders = threadPool.getThreadContext().getHeaders().entrySet().stream() + .filter(e -> ShardFollowTask.HEADER_FILTERS.contains(e.getKey())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + Consumer consumer = remoteClusterState -> { + String[] indices = request.getLeaderIndexPatterns().toArray(new String[0]); + ccrLicenseChecker.hasPrivilegesToFollowIndices(remoteClient, indices, e -> { + if (e == null) { + clusterService.submitStateUpdateTask("put-auto-follow-pattern-" + request.getRemoteCluster(), + new AckedClusterStateUpdateTask(request, listener) { + + @Override + protected AcknowledgedResponse newResponse(boolean acknowledged) { + return new AcknowledgedResponse(acknowledged); + } + + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + return innerPut(request, filteredHeaders, currentState, remoteClusterState); + } + }); + } else { + listener.onFailure(e); + } + }); + }; + final ClusterStateRequest clusterStateRequest = new ClusterStateRequest(); clusterStateRequest.clear(); clusterStateRequest.metaData(true); - Map filteredHeaders = threadPool.getThreadContext().getHeaders().entrySet().stream() - .filter(e -> ShardFollowTask.HEADER_FILTERS.contains(e.getKey())) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + ccrLicenseChecker.checkRemoteClusterLicenseAndFetchClusterState(client, request.getRemoteCluster(), + clusterStateRequest, listener::onFailure, consumer); - String[] indices = request.getLeaderIndexPatterns().toArray(new String[0]); - ccrLicenseChecker.hasPrivilegesToFollowIndices(remoteClient, indices, e -> { - if (e == null) { - remoteClient.admin().cluster().state( - clusterStateRequest, - ActionListener.wrap( - clusterStateResponse -> { - final ClusterState leaderClusterState = clusterStateResponse.getState(); - clusterService.submitStateUpdateTask("put-auto-follow-pattern-" + request.getRemoteCluster(), - new AckedClusterStateUpdateTask(request, listener) { - - @Override - protected AcknowledgedResponse newResponse(boolean acknowledged) { - return new AcknowledgedResponse(acknowledged); - } - - @Override - public ClusterState execute(ClusterState currentState) throws Exception { - return innerPut(request, filteredHeaders, currentState, leaderClusterState); - } - }); - }, - listener::onFailure)); - } else { - listener.onFailure(e); - } - }); } static ClusterState innerPut(PutAutoFollowPatternAction.Request request, Map filteredHeaders, ClusterState localState, - ClusterState leaderClusterState) { + ClusterState remoteClusterState) { // auto patterns are always overwritten // only already followed index uuids are updated @@ -143,10 +143,10 @@ public class TransportPutAutoFollowPatternAction extends followedLeaderIndices.put(request.getName(), followedIndexUUIDs); // Mark existing leader indices as already auto followed: if (previousPattern != null) { - markExistingIndicesAsAutoFollowedForNewPatterns(request.getLeaderIndexPatterns(), leaderClusterState.metaData(), + markExistingIndicesAsAutoFollowedForNewPatterns(request.getLeaderIndexPatterns(), remoteClusterState.metaData(), previousPattern, followedIndexUUIDs); } else { - markExistingIndicesAsAutoFollowed(request.getLeaderIndexPatterns(), leaderClusterState.metaData(), + markExistingIndicesAsAutoFollowed(request.getLeaderIndexPatterns(), remoteClusterState.metaData(), followedIndexUUIDs); }