diff --git a/x-pack/plugin/ccr/qa/build.gradle b/x-pack/plugin/ccr/qa/build.gradle index dc44f8f753d..f408e6a78b6 100644 --- a/x-pack/plugin/ccr/qa/build.gradle +++ b/x-pack/plugin/ccr/qa/build.gradle @@ -1,5 +1,12 @@ import org.elasticsearch.gradle.test.RestIntegTestTask +apply plugin: 'elasticsearch.build' +test.enabled = false + +dependencies { + compile project(':test:framework') +} + subprojects { project.tasks.withType(RestIntegTestTask) { final File xPackResources = new File(xpackProject('plugin').projectDir, 'src/test/resources') diff --git a/x-pack/plugin/ccr/qa/chain/build.gradle b/x-pack/plugin/ccr/qa/chain/build.gradle index 7b3e20f86ce..f93feb4a66a 100644 --- a/x-pack/plugin/ccr/qa/chain/build.gradle +++ b/x-pack/plugin/ccr/qa/chain/build.gradle @@ -5,6 +5,7 @@ apply plugin: 'elasticsearch.standalone-test' dependencies { testCompile project(path: xpackModule('core'), configuration: 'testArtifacts') testCompile project(path: xpackModule('ccr'), configuration: 'runtime') + testCompile project(':x-pack:plugin:ccr:qa') } task leaderClusterTest(type: RestIntegTestTask) { diff --git a/x-pack/plugin/ccr/qa/chain/src/test/java/org/elasticsearch/xpack/ccr/ChainIT.java b/x-pack/plugin/ccr/qa/chain/src/test/java/org/elasticsearch/xpack/ccr/ChainIT.java index 1a8a8e0096f..e5a37aa829b 100644 --- a/x-pack/plugin/ccr/qa/chain/src/test/java/org/elasticsearch/xpack/ccr/ChainIT.java +++ b/x-pack/plugin/ccr/qa/chain/src/test/java/org/elasticsearch/xpack/ccr/ChainIT.java @@ -6,34 +6,10 @@ package org.elasticsearch.xpack.ccr; -import org.apache.http.HttpHost; -import org.apache.http.util.EntityUtils; -import org.elasticsearch.client.Request; -import org.elasticsearch.client.Response; import org.elasticsearch.client.RestClient; -import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentHelper; -import org.elasticsearch.common.xcontent.json.JsonXContent; -import org.elasticsearch.common.xcontent.support.XContentMapValues; -import org.elasticsearch.test.rest.ESRestTestCase; -import java.io.IOException; -import java.util.List; -import java.util.Map; - -import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; -import static org.hamcrest.Matchers.equalTo; - -public class ChainIT extends ESRestTestCase { - - private final String targetCluster = System.getProperty("tests.target_cluster"); - - @Override - protected boolean preserveClusterUponCompletion() { - return true; - } +public class ChainIT extends ESCCRRestTestCase { public void testFollowIndex() throws Exception { final int numDocs = 128; @@ -60,23 +36,23 @@ public class ChainIT extends ESRestTestCase { index(client(), leaderIndexName, Integer.toString(i), "field", i, "filtered_field", "true"); } refresh(leaderIndexName); - verifyDocuments(leaderIndexName, numDocs); + verifyDocuments(leaderIndexName, numDocs, "filtered_field:true"); } else if ("middle".equals(targetCluster)) { logger.info("Running against middle cluster"); followIndex("leader_cluster", leaderIndexName, middleIndexName); - assertBusy(() -> verifyDocuments(middleIndexName, numDocs)); + assertBusy(() -> verifyDocuments(middleIndexName, numDocs, "filtered_field:true")); try (RestClient leaderClient = buildLeaderClient()) { int id = numDocs; index(leaderClient, leaderIndexName, Integer.toString(id), "field", id, "filtered_field", "true"); index(leaderClient, leaderIndexName, Integer.toString(id + 1), "field", id + 1, "filtered_field", "true"); index(leaderClient, leaderIndexName, Integer.toString(id + 2), "field", id + 2, "filtered_field", "true"); } - assertBusy(() -> verifyDocuments(middleIndexName, numDocs + 3)); + assertBusy(() -> verifyDocuments(middleIndexName, numDocs + 3, "filtered_field:true")); } else if ("follow".equals(targetCluster)) { logger.info("Running against follow cluster"); final String followIndexName = "follow"; followIndex("middle_cluster", middleIndexName, followIndexName); - assertBusy(() -> verifyDocuments(followIndexName, numDocs + 3)); + assertBusy(() -> verifyDocuments(followIndexName, numDocs + 3, "filtered_field:true")); try (RestClient leaderClient = buildLeaderClient()) { int id = numDocs + 3; @@ -86,82 +62,13 @@ public class ChainIT extends ESRestTestCase { } try (RestClient middleClient = buildMiddleClient()) { - assertBusy(() -> verifyDocuments(middleIndexName, numDocs + 6, middleClient)); + assertBusy(() -> verifyDocuments(middleIndexName, numDocs + 6, "filtered_field:true", middleClient)); } - assertBusy(() -> verifyDocuments(followIndexName, numDocs + 6)); + assertBusy(() -> verifyDocuments(followIndexName, numDocs + 6, "filtered_field:true")); } else { fail("unexpected target cluster [" + targetCluster + "]"); } } - private static void index(RestClient client, String index, String id, Object... fields) throws IOException { - XContentBuilder document = jsonBuilder().startObject(); - for (int i = 0; i < fields.length; i += 2) { - document.field((String) fields[i], fields[i + 1]); - } - document.endObject(); - final Request request = new Request("POST", "/" + index + "/_doc/" + id); - request.setJsonEntity(Strings.toString(document)); - assertOK(client.performRequest(request)); - } - - private static void refresh(String index) throws IOException { - assertOK(client().performRequest(new Request("POST", "/" + index + "/_refresh"))); - } - - private static void followIndex(String leaderCluster, String leaderIndex, String followIndex) throws IOException { - final Request request = new Request("PUT", "/" + followIndex + "/_ccr/follow"); - request.setJsonEntity( - "{\"leader_cluster\": \"" + leaderCluster + "\", \"leader_index\": \"" + leaderIndex + "\", \"poll_timeout\": \"10ms\"}"); - assertOK(client().performRequest(request)); - } - - private static void verifyDocuments(String index, int expectedNumDocs) throws IOException { - verifyDocuments(index, expectedNumDocs, client()); - } - - private static void verifyDocuments(final String index, final int expectedNumDocs, final RestClient client) throws IOException { - final Request request = new Request("GET", "/" + index + "/_search"); - request.addParameter("size", Integer.toString(expectedNumDocs)); - request.addParameter("sort", "field:asc"); - request.addParameter("q", "filtered_field:true"); - Map response = toMap(client.performRequest(request)); - - int numDocs = (int) XContentMapValues.extractValue("hits.total", response); - assertThat(numDocs, equalTo(expectedNumDocs)); - - List hits = (List) XContentMapValues.extractValue("hits.hits", response); - assertThat(hits.size(), equalTo(expectedNumDocs)); - for (int i = 0; i < expectedNumDocs; i++) { - int value = (int) XContentMapValues.extractValue("_source.field", (Map) hits.get(i)); - assertThat(i, equalTo(value)); - } - } - - private static Map toMap(Response response) throws IOException { - return toMap(EntityUtils.toString(response.getEntity())); - } - - private static Map toMap(String response) { - return XContentHelper.convertToMap(JsonXContent.jsonXContent, response, false); - } - - private RestClient buildLeaderClient() throws IOException { - assert "leader".equals(targetCluster) == false; - return buildClient(System.getProperty("tests.leader_host")); - } - - private RestClient buildMiddleClient() throws IOException { - assert "middle".equals(targetCluster) == false; - return buildClient(System.getProperty("tests.middle_host")); - } - - private RestClient buildClient(final String url) throws IOException { - int portSeparator = url.lastIndexOf(':'); - HttpHost httpHost = new HttpHost(url.substring(0, portSeparator), - Integer.parseInt(url.substring(portSeparator + 1)), getProtocol()); - return buildClient(Settings.EMPTY, new HttpHost[]{httpHost}); - } - } diff --git a/x-pack/plugin/ccr/qa/multi-cluster-with-non-compliant-license/build.gradle b/x-pack/plugin/ccr/qa/multi-cluster-with-non-compliant-license/build.gradle index 845c9df533d..7f1dd2c3211 100644 --- a/x-pack/plugin/ccr/qa/multi-cluster-with-non-compliant-license/build.gradle +++ b/x-pack/plugin/ccr/qa/multi-cluster-with-non-compliant-license/build.gradle @@ -5,6 +5,7 @@ apply plugin: 'elasticsearch.standalone-test' dependencies { testCompile project(path: xpackModule('core'), configuration: 'testArtifacts') testCompile project(path: xpackModule('ccr'), configuration: 'runtime') + testCompile project(':x-pack:plugin:ccr:qa:') } task leaderClusterTest(type: RestIntegTestTask) { @@ -17,7 +18,7 @@ leaderClusterTestCluster { } leaderClusterTestRunner { - systemProperty 'tests.is_leader_cluster', 'true' + systemProperty 'tests.target_cluster', 'leader' } task writeJavaPolicy { @@ -49,7 +50,7 @@ followClusterTestCluster { followClusterTestRunner { systemProperty 'java.security.policy', "file://${buildDir}/tmp/java.policy" - systemProperty 'tests.is_leader_cluster', 'false' + systemProperty 'tests.target_cluster', 'follow' systemProperty 'tests.leader_host', "${-> leaderClusterTest.nodes.get(0).httpUri()}" systemProperty 'log', "${-> followClusterTest.getNodes().get(0).homeDir}/logs/${-> followClusterTest.getNodes().get(0).clusterName}.log" finalizedBy 'leaderClusterTestCluster#stop' diff --git a/x-pack/plugin/ccr/qa/multi-cluster-with-non-compliant-license/src/test/java/org/elasticsearch/xpack/ccr/CcrMultiClusterLicenseIT.java b/x-pack/plugin/ccr/qa/multi-cluster-with-non-compliant-license/src/test/java/org/elasticsearch/xpack/ccr/CcrMultiClusterLicenseIT.java index 988f6b97bd2..7e85c19d7b9 100644 --- a/x-pack/plugin/ccr/qa/multi-cluster-with-non-compliant-license/src/test/java/org/elasticsearch/xpack/ccr/CcrMultiClusterLicenseIT.java +++ b/x-pack/plugin/ccr/qa/multi-cluster-with-non-compliant-license/src/test/java/org/elasticsearch/xpack/ccr/CcrMultiClusterLicenseIT.java @@ -9,9 +9,7 @@ package org.elasticsearch.xpack.ccr; import org.apache.lucene.util.Constants; import org.elasticsearch.client.Request; import org.elasticsearch.client.ResponseException; -import org.elasticsearch.common.Booleans; import org.elasticsearch.common.io.PathUtils; -import org.elasticsearch.test.rest.ESRestTestCase; import java.nio.file.Files; import java.util.Iterator; @@ -22,17 +20,10 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasToString; -public class CcrMultiClusterLicenseIT extends ESRestTestCase { - - private final boolean runningAgainstLeaderCluster = Booleans.parseBoolean(System.getProperty("tests.is_leader_cluster")); - - @Override - protected boolean preserveClusterUponCompletion() { - return true; - } +public class CcrMultiClusterLicenseIT extends ESCCRRestTestCase { public void testFollow() { - if (runningAgainstLeaderCluster == false) { + if ("follow".equals(targetCluster)) { final Request request = new Request("PUT", "/follower/_ccr/follow"); request.setJsonEntity("{\"leader_cluster\": \"leader_cluster\", \"leader_index\": \"leader\"}"); assertNonCompliantLicense(request); @@ -41,7 +32,7 @@ public class CcrMultiClusterLicenseIT extends ESRestTestCase { public void testAutoFollow() throws Exception { assumeFalse("windows is the worst", Constants.WINDOWS); - if (runningAgainstLeaderCluster == false) { + if ("follow".equals(targetCluster)) { final Request request = new Request("PUT", "/_ccr/auto_follow/test_pattern"); request.setJsonEntity("{\"leader_index_patterns\":[\"*\"], \"leader_cluster\": \"leader_cluster\"}"); client().performRequest(request); diff --git a/x-pack/plugin/ccr/qa/multi-cluster-with-security/build.gradle b/x-pack/plugin/ccr/qa/multi-cluster-with-security/build.gradle index 418c4e6d249..f005a71b165 100644 --- a/x-pack/plugin/ccr/qa/multi-cluster-with-security/build.gradle +++ b/x-pack/plugin/ccr/qa/multi-cluster-with-security/build.gradle @@ -5,6 +5,7 @@ apply plugin: 'elasticsearch.standalone-test' dependencies { testCompile project(path: xpackModule('core'), configuration: 'testArtifacts') testCompile project(path: xpackModule('ccr'), configuration: 'runtime') + testCompile project(':x-pack:plugin:ccr:qa') } task leaderClusterTest(type: RestIntegTestTask) { @@ -35,7 +36,7 @@ leaderClusterTestCluster { } leaderClusterTestRunner { - systemProperty 'tests.is_leader_cluster', 'true' + systemProperty 'tests.target_cluster', 'leader' } task followClusterTest(type: RestIntegTestTask) {} @@ -66,7 +67,7 @@ followClusterTestCluster { } followClusterTestRunner { - systemProperty 'tests.is_leader_cluster', 'false' + systemProperty 'tests.target_cluster', 'follow' systemProperty 'tests.leader_host', "${-> leaderClusterTest.nodes.get(0).httpUri()}" finalizedBy 'leaderClusterTestCluster#stop' } diff --git a/x-pack/plugin/ccr/qa/multi-cluster-with-security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java b/x-pack/plugin/ccr/qa/multi-cluster-with-security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java index d5e7cbcce49..18e061f3790 100644 --- a/x-pack/plugin/ccr/qa/multi-cluster-with-security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java +++ b/x-pack/plugin/ccr/qa/multi-cluster-with-security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java @@ -5,39 +5,24 @@ */ package org.elasticsearch.xpack.ccr; -import org.apache.http.HttpHost; -import org.apache.http.util.EntityUtils; import org.elasticsearch.client.Request; -import org.elasticsearch.client.Response; import org.elasticsearch.client.ResponseException; import org.elasticsearch.client.RestClient; -import org.elasticsearch.common.Booleans; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.SecureString; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentHelper; -import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.common.xcontent.support.XContentMapValues; -import org.elasticsearch.rest.RestStatus; -import org.elasticsearch.test.rest.ESRestTestCase; -import java.io.IOException; import java.util.List; import java.util.Map; -import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue; import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.endsWith; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; -public class FollowIndexSecurityIT extends ESRestTestCase { - - private final boolean runningAgainstLeaderCluster = Booleans.parseBoolean(System.getProperty("tests.is_leader_cluster")); +public class FollowIndexSecurityIT extends ESCCRRestTestCase { @Override protected Settings restClientSettings() { @@ -55,16 +40,11 @@ public class FollowIndexSecurityIT extends ESRestTestCase { .build(); } - @Override - protected boolean preserveClusterUponCompletion() { - return true; - } - public void testFollowIndex() throws Exception { final int numDocs = 16; final String allowedIndex = "allowed-index"; final String unallowedIndex = "unallowed-index"; - if (runningAgainstLeaderCluster) { + if ("leader".equals(targetCluster)) { logger.info("Running against leader cluster"); Settings indexSettings = Settings.builder().put("index.soft_deletes.enabled", true).build(); createIndex(allowedIndex, indexSettings); @@ -78,10 +58,10 @@ public class FollowIndexSecurityIT extends ESRestTestCase { index(unallowedIndex, Integer.toString(i), "field", i); } refresh(allowedIndex); - verifyDocuments(adminClient(), allowedIndex, numDocs); + verifyDocuments(allowedIndex, numDocs, "*:*"); } else { - follow(client(), allowedIndex, allowedIndex); - assertBusy(() -> verifyDocuments(client(), allowedIndex, numDocs)); + followIndex(client(), "leader_cluster", allowedIndex, allowedIndex); + assertBusy(() -> verifyDocuments(allowedIndex, numDocs, "*:*")); assertThat(countCcrNodeTasks(), equalTo(1)); assertBusy(() -> verifyCcrMonitoring(allowedIndex, allowedIndex)); assertOK(client().performRequest(new Request("POST", "/" + allowedIndex + "/_ccr/pause_follow"))); @@ -110,30 +90,31 @@ public class FollowIndexSecurityIT extends ESRestTestCase { assertThat(e.getMessage(), containsString("follow index [" + allowedIndex + "] does not have ccr metadata")); // User does not have manage_follow_index index privilege for 'unallowedIndex': - e = expectThrows(ResponseException.class, () -> follow(client(), unallowedIndex, unallowedIndex)); + e = expectThrows(ResponseException.class, () -> followIndex(client(), "leader_cluster", unallowedIndex, unallowedIndex)); assertThat(e.getMessage(), containsString("action [indices:admin/xpack/ccr/put_follow] is unauthorized for user [test_ccr]")); // Verify that the follow index has not been created and no node tasks are running - assertThat(indexExists(adminClient(), unallowedIndex), is(false)); + assertThat(indexExists(unallowedIndex), is(false)); assertBusy(() -> assertThat(countCcrNodeTasks(), equalTo(0))); // User does have manage_follow_index index privilege on 'allowed' index, // but not read / monitor roles on 'disallowed' index: - e = expectThrows(ResponseException.class, () -> follow(client(), unallowedIndex, allowedIndex)); + e = expectThrows(ResponseException.class, () -> followIndex(client(), "leader_cluster", unallowedIndex, allowedIndex)); assertThat(e.getMessage(), containsString("insufficient privileges to follow index [unallowed-index], " + "privilege for action [indices:monitor/stats] is missing, " + "privilege for action [indices:data/read/xpack/ccr/shard_changes] is missing")); // Verify that the follow index has not been created and no node tasks are running - assertThat(indexExists(adminClient(), unallowedIndex), is(false)); + assertThat(indexExists(unallowedIndex), is(false)); assertBusy(() -> assertThat(countCcrNodeTasks(), equalTo(0))); - follow(adminClient(), unallowedIndex, unallowedIndex); + followIndex(adminClient(), "leader_cluster", unallowedIndex, unallowedIndex); pauseFollow(adminClient(), unallowedIndex); e = expectThrows(ResponseException.class, () -> resumeFollow(unallowedIndex)); assertThat(e.getMessage(), containsString("insufficient privileges to follow index [unallowed-index], " + "privilege for action [indices:monitor/stats] is missing, " + "privilege for action [indices:data/read/xpack/ccr/shard_changes] is missing")); + assertBusy(() -> assertThat(countCcrNodeTasks(), equalTo(0))); e = expectThrows(ResponseException.class, () -> client().performRequest(new Request("POST", "/" + unallowedIndex + "/_ccr/unfollow"))); @@ -145,7 +126,7 @@ public class FollowIndexSecurityIT extends ESRestTestCase { } public void testAutoFollowPatterns() throws Exception { - assumeFalse("Test should only run when both clusters are running", runningAgainstLeaderCluster); + assumeFalse("Test should only run when both clusters are running", "leader".equals(targetCluster)); String allowedIndex = "logs-eu-20190101"; String disallowedIndex = "logs-us-20190101"; @@ -180,9 +161,9 @@ public class FollowIndexSecurityIT extends ESRestTestCase { assertBusy(() -> { ensureYellow(allowedIndex); - verifyDocuments(adminClient(), allowedIndex, 5); + verifyDocuments(allowedIndex, 5, "*:*"); }); - assertThat(indexExists(adminClient(), disallowedIndex), is(false)); + assertThat(indexExists(disallowedIndex), is(false)); assertBusy(() -> { verifyCcrMonitoring(allowedIndex, allowedIndex); verifyAutoFollowMonitoring(); @@ -194,181 +175,4 @@ public class FollowIndexSecurityIT extends ESRestTestCase { pauseFollow(client(), allowedIndex); } - private int countCcrNodeTasks() throws IOException { - final Request request = new Request("GET", "/_tasks"); - request.addParameter("detailed", "true"); - Map rsp1 = toMap(adminClient().performRequest(request)); - Map nodes = (Map) rsp1.get("nodes"); - assertThat(nodes.size(), equalTo(1)); - Map node = (Map) nodes.values().iterator().next(); - Map nodeTasks = (Map) node.get("tasks"); - int numNodeTasks = 0; - for (Map.Entry entry : nodeTasks.entrySet()) { - Map nodeTask = (Map) entry.getValue(); - String action = (String) nodeTask.get("action"); - if (action.startsWith("xpack/ccr/shard_follow_task")) { - numNodeTasks++; - } - } - return numNodeTasks; - } - - private static void index(String index, String id, Object... fields) throws IOException { - index(adminClient(), index, id, fields); - } - - private static void index(RestClient client, String index, String id, Object... fields) throws IOException { - XContentBuilder document = jsonBuilder().startObject(); - for (int i = 0; i < fields.length; i += 2) { - document.field((String) fields[i], fields[i + 1]); - } - document.endObject(); - final Request request = new Request("POST", "/" + index + "/_doc/" + id); - request.setJsonEntity(Strings.toString(document)); - assertOK(client.performRequest(request)); - } - - private static void refresh(String index) throws IOException { - assertOK(adminClient().performRequest(new Request("POST", "/" + index + "/_refresh"))); - } - - private static void resumeFollow(String followIndex) throws IOException { - final Request request = new Request("POST", "/" + followIndex + "/_ccr/resume_follow"); - request.setJsonEntity("{\"poll_timeout\": \"10ms\"}"); - assertOK(client().performRequest(request)); - } - - private static void follow(RestClient client, String leaderIndex, String followIndex) throws IOException { - final Request request = new Request("PUT", "/" + followIndex + "/_ccr/follow"); - request.setJsonEntity("{\"leader_cluster\": \"leader_cluster\", \"leader_index\": \"" + leaderIndex + - "\", \"poll_timeout\": \"10ms\"}"); - assertOK(client.performRequest(request)); - } - - void verifyDocuments(RestClient client, String index, int expectedNumDocs) throws IOException { - final Request request = new Request("GET", "/" + index + "/_search"); - request.addParameter("pretty", "true"); - request.addParameter("size", Integer.toString(expectedNumDocs)); - request.addParameter("sort", "field:asc"); - Map response = toMap(client.performRequest(request)); - - int numDocs = (int) XContentMapValues.extractValue("hits.total", response); - assertThat(numDocs, equalTo(expectedNumDocs)); - - List hits = (List) XContentMapValues.extractValue("hits.hits", response); - assertThat(hits.size(), equalTo(expectedNumDocs)); - for (int i = 0; i < expectedNumDocs; i++) { - int value = (int) XContentMapValues.extractValue("_source.field", (Map) hits.get(i)); - assertThat(i, equalTo(value)); - } - } - - private static Map toMap(Response response) throws IOException { - return toMap(EntityUtils.toString(response.getEntity())); - } - - private static Map toMap(String response) { - return XContentHelper.convertToMap(JsonXContent.jsonXContent, response, false); - } - - protected static void createIndex(String name, Settings settings) throws IOException { - createIndex(name, settings, ""); - } - - protected static void createIndex(String name, Settings settings, String mapping) throws IOException { - final Request request = new Request("PUT", "/" + name); - request.setJsonEntity("{ \"settings\": " + Strings.toString(settings) + ", \"mappings\" : {" + mapping + "} }"); - assertOK(adminClient().performRequest(request)); - } - - private static void ensureYellow(String index) throws IOException { - Request request = new Request("GET", "/_cluster/health/" + index); - request.addParameter("wait_for_status", "yellow"); - request.addParameter("wait_for_no_relocating_shards", "true"); - request.addParameter("wait_for_no_initializing_shards", "true"); - request.addParameter("timeout", "70s"); - request.addParameter("level", "shards"); - adminClient().performRequest(request); - } - - private RestClient buildLeaderClient() throws IOException { - assert runningAgainstLeaderCluster == false; - String leaderUrl = System.getProperty("tests.leader_host"); - int portSeparator = leaderUrl.lastIndexOf(':'); - HttpHost httpHost = new HttpHost(leaderUrl.substring(0, portSeparator), - Integer.parseInt(leaderUrl.substring(portSeparator + 1)), getProtocol()); - return buildClient(restAdminSettings(), new HttpHost[]{httpHost}); - } - - private static boolean indexExists(RestClient client, String index) throws IOException { - Response response = client.performRequest(new Request("HEAD", "/" + index)); - return RestStatus.OK.getStatus() == response.getStatusLine().getStatusCode(); - } - - private static void pauseFollow(RestClient client, String followIndex) throws IOException { - assertOK(client().performRequest(new Request("POST", "/" + followIndex + "/_ccr/pause_follow"))); - } - - private static void verifyCcrMonitoring(String expectedLeaderIndex, String expectedFollowerIndex) throws IOException { - Request request = new Request("GET", "/.monitoring-*/_search"); - request.setJsonEntity("{\"query\": {\"term\": {\"ccr_stats.leader_index\": \"" + expectedLeaderIndex + "\"}}}"); - Map response; - try { - response = toMap(adminClient().performRequest(request)); - } catch (ResponseException e) { - throw new AssertionError("error while searching", e); - } - - int numberOfOperationsReceived = 0; - int numberOfOperationsIndexed = 0; - - List hits = (List) XContentMapValues.extractValue("hits.hits", response); - assertThat(hits.size(), greaterThanOrEqualTo(1)); - - for (int i = 0; i < hits.size(); i++) { - Map hit = (Map) hits.get(i); - String leaderIndex = (String) XContentMapValues.extractValue("_source.ccr_stats.leader_index", hit); - assertThat(leaderIndex, endsWith(expectedLeaderIndex)); - - final String followerIndex = (String) XContentMapValues.extractValue("_source.ccr_stats.follower_index", hit); - assertThat(followerIndex, equalTo(expectedFollowerIndex)); - - int foundNumberOfOperationsReceived = - (int) XContentMapValues.extractValue("_source.ccr_stats.operations_received", hit); - numberOfOperationsReceived = Math.max(numberOfOperationsReceived, foundNumberOfOperationsReceived); - int foundNumberOfOperationsIndexed = - (int) XContentMapValues.extractValue("_source.ccr_stats.number_of_operations_indexed", hit); - numberOfOperationsIndexed = Math.max(numberOfOperationsIndexed, foundNumberOfOperationsIndexed); - } - - assertThat(numberOfOperationsReceived, greaterThanOrEqualTo(1)); - assertThat(numberOfOperationsIndexed, greaterThanOrEqualTo(1)); - } - - private static void verifyAutoFollowMonitoring() throws IOException { - Request request = new Request("GET", "/.monitoring-*/_search"); - request.setJsonEntity("{\"query\": {\"term\": {\"type\": \"ccr_auto_follow_stats\"}}}"); - Map response; - try { - response = toMap(adminClient().performRequest(request)); - } catch (ResponseException e) { - throw new AssertionError("error while searching", e); - } - - int numberOfSuccessfulFollowIndices = 0; - - List hits = (List) XContentMapValues.extractValue("hits.hits", response); - assertThat(hits.size(), greaterThanOrEqualTo(1)); - - for (int i = 0; i < hits.size(); i++) { - Map hit = (Map) hits.get(i); - - int foundNumberOfOperationsReceived = - (int) XContentMapValues.extractValue("_source.ccr_auto_follow_stats.number_of_successful_follow_indices", hit); - numberOfSuccessfulFollowIndices = Math.max(numberOfSuccessfulFollowIndices, foundNumberOfOperationsReceived); - } - - assertThat(numberOfSuccessfulFollowIndices, greaterThanOrEqualTo(1)); - } - } diff --git a/x-pack/plugin/ccr/qa/multi-cluster/build.gradle b/x-pack/plugin/ccr/qa/multi-cluster/build.gradle index b3b63723848..3e3661aae1a 100644 --- a/x-pack/plugin/ccr/qa/multi-cluster/build.gradle +++ b/x-pack/plugin/ccr/qa/multi-cluster/build.gradle @@ -5,6 +5,7 @@ apply plugin: 'elasticsearch.standalone-test' dependencies { testCompile project(path: xpackModule('core'), configuration: 'testArtifacts') testCompile project(path: xpackModule('ccr'), configuration: 'runtime') + testCompile project(':x-pack:plugin:ccr:qa') } task leaderClusterTest(type: RestIntegTestTask) { @@ -18,7 +19,7 @@ leaderClusterTestCluster { } leaderClusterTestRunner { - systemProperty 'tests.is_leader_cluster', 'true' + systemProperty 'tests.target_cluster', 'leader' } task followClusterTest(type: RestIntegTestTask) {} @@ -33,7 +34,7 @@ followClusterTestCluster { } followClusterTestRunner { - systemProperty 'tests.is_leader_cluster', 'false' + systemProperty 'tests.target_cluster', 'follow' systemProperty 'tests.leader_host', "${-> leaderClusterTest.nodes.get(0).httpUri()}" finalizedBy 'leaderClusterTestCluster#stop' } diff --git a/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java b/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java index ff7dc9e72b5..9383d653de6 100644 --- a/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java +++ b/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java @@ -5,44 +5,23 @@ */ package org.elasticsearch.xpack.ccr; -import org.apache.http.HttpHost; -import org.apache.http.util.EntityUtils; import org.elasticsearch.client.Request; -import org.elasticsearch.client.Response; import org.elasticsearch.client.ResponseException; import org.elasticsearch.client.RestClient; -import org.elasticsearch.common.Booleans; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentHelper; -import org.elasticsearch.common.xcontent.json.JsonXContent; -import org.elasticsearch.common.xcontent.support.XContentMapValues; -import org.elasticsearch.test.rest.ESRestTestCase; -import java.io.IOException; -import java.util.List; import java.util.Map; -import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.endsWith; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThanOrEqualTo; -public class FollowIndexIT extends ESRestTestCase { - - private final boolean runningAgainstLeaderCluster = Booleans.parseBoolean(System.getProperty("tests.is_leader_cluster")); - - @Override - protected boolean preserveClusterUponCompletion() { - return true; - } +public class FollowIndexIT extends ESCCRRestTestCase { public void testFollowIndex() throws Exception { final int numDocs = 128; final String leaderIndexName = "test_index1"; - if (runningAgainstLeaderCluster) { + if ("leader".equals(targetCluster)) { logger.info("Running against leader cluster"); String mapping = ""; if (randomBoolean()) { // randomly do source filtering on indexing @@ -63,12 +42,12 @@ public class FollowIndexIT extends ESRestTestCase { index(client(), leaderIndexName, Integer.toString(i), "field", i, "filtered_field", "true"); } refresh(leaderIndexName); - verifyDocuments(leaderIndexName, numDocs); + verifyDocuments(leaderIndexName, numDocs, "filtered_field:true"); } else { logger.info("Running against follow cluster"); final String followIndexName = "test_index2"; followIndex(leaderIndexName, followIndexName); - assertBusy(() -> verifyDocuments(followIndexName, numDocs)); + assertBusy(() -> verifyDocuments(followIndexName, numDocs, "filtered_field:true")); // unfollow and then follow and then index a few docs in leader index: pauseFollow(followIndexName); resumeFollow(followIndexName); @@ -78,7 +57,7 @@ public class FollowIndexIT extends ESRestTestCase { index(leaderClient, leaderIndexName, Integer.toString(id + 1), "field", id + 1, "filtered_field", "true"); index(leaderClient, leaderIndexName, Integer.toString(id + 2), "field", id + 2, "filtered_field", "true"); } - assertBusy(() -> verifyDocuments(followIndexName, numDocs + 3)); + assertBusy(() -> verifyDocuments(followIndexName, numDocs + 3, "filtered_field:true")); assertBusy(() -> verifyCcrMonitoring(leaderIndexName, followIndexName)); pauseFollow(followIndexName); @@ -90,7 +69,7 @@ public class FollowIndexIT extends ESRestTestCase { } public void testFollowNonExistingLeaderIndex() throws Exception { - assumeFalse("Test should only run when both clusters are running", runningAgainstLeaderCluster); + assumeFalse("Test should only run when both clusters are running", "leader".equals(targetCluster)); ResponseException e = expectThrows(ResponseException.class, () -> resumeFollow("non-existing-index")); assertThat(e.getMessage(), containsString("no such index")); assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(404)); @@ -101,7 +80,7 @@ public class FollowIndexIT extends ESRestTestCase { } public void testAutoFollowPatterns() throws Exception { - assumeFalse("Test should only run when both clusters are running", runningAgainstLeaderCluster); + assumeFalse("Test should only run when both clusters are running", "leader".equals(targetCluster)); Request request = new Request("PUT", "/_ccr/auto_follow/test_pattern"); request.setJsonEntity("{\"leader_index_patterns\": [\"logs-*\"], \"leader_cluster\": \"leader_cluster\"}"); @@ -128,7 +107,7 @@ public class FollowIndexIT extends ESRestTestCase { assertThat(response.get("number_of_successful_follow_indices"), equalTo(1)); ensureYellow("logs-20190101"); - verifyDocuments("logs-20190101", 5); + verifyDocuments("logs-20190101", 5, "filtered_field:true"); }); assertBusy(() -> { verifyCcrMonitoring("logs-20190101", "logs-20190101"); @@ -136,143 +115,4 @@ public class FollowIndexIT extends ESRestTestCase { }); } - private static void index(RestClient client, String index, String id, Object... fields) throws IOException { - XContentBuilder document = jsonBuilder().startObject(); - for (int i = 0; i < fields.length; i += 2) { - document.field((String) fields[i], fields[i + 1]); - } - document.endObject(); - final Request request = new Request("POST", "/" + index + "/_doc/" + id); - request.setJsonEntity(Strings.toString(document)); - assertOK(client.performRequest(request)); - } - - private static void refresh(String index) throws IOException { - assertOK(client().performRequest(new Request("POST", "/" + index + "/_refresh"))); - } - - private static void resumeFollow(String followIndex) throws IOException { - final Request request = new Request("POST", "/" + followIndex + "/_ccr/resume_follow"); - request.setJsonEntity("{\"poll_timeout\": \"10ms\"}"); - assertOK(client().performRequest(request)); - } - - private static void followIndex(String leaderIndex, String followIndex) throws IOException { - final Request request = new Request("PUT", "/" + followIndex + "/_ccr/follow"); - request.setJsonEntity("{\"leader_cluster\": \"leader_cluster\", \"leader_index\": \"" + leaderIndex + - "\", \"poll_timeout\": \"10ms\"}"); - assertOK(client().performRequest(request)); - } - - private static void pauseFollow(String followIndex) throws IOException { - assertOK(client().performRequest(new Request("POST", "/" + followIndex + "/_ccr/pause_follow"))); - } - - private static void verifyDocuments(String index, int expectedNumDocs) throws IOException { - final Request request = new Request("GET", "/" + index + "/_search"); - request.addParameter("size", Integer.toString(expectedNumDocs)); - request.addParameter("sort", "field:asc"); - request.addParameter("q", "filtered_field:true"); - Map response = toMap(client().performRequest(request)); - - int numDocs = (int) XContentMapValues.extractValue("hits.total", response); - assertThat(numDocs, equalTo(expectedNumDocs)); - - List hits = (List) XContentMapValues.extractValue("hits.hits", response); - assertThat(hits.size(), equalTo(expectedNumDocs)); - for (int i = 0; i < expectedNumDocs; i++) { - int value = (int) XContentMapValues.extractValue("_source.field", (Map) hits.get(i)); - assertThat(i, equalTo(value)); - } - } - - private static void verifyCcrMonitoring(final String expectedLeaderIndex, final String expectedFollowerIndex) throws IOException { - Request request = new Request("GET", "/.monitoring-*/_search"); - request.setJsonEntity("{\"query\": {\"term\": {\"ccr_stats.leader_index\": \"" + expectedLeaderIndex + "\"}}}"); - Map response; - try { - response = toMap(client().performRequest(request)); - } catch (ResponseException e) { - throw new AssertionError("error while searching", e); - } - - int numberOfOperationsReceived = 0; - int numberOfOperationsIndexed = 0; - - List hits = (List) XContentMapValues.extractValue("hits.hits", response); - assertThat(hits.size(), greaterThanOrEqualTo(1)); - - for (int i = 0; i < hits.size(); i++) { - Map hit = (Map) hits.get(i); - String leaderIndex = (String) XContentMapValues.extractValue("_source.ccr_stats.leader_index", hit); - assertThat(leaderIndex, endsWith(expectedLeaderIndex)); - - final String followerIndex = (String) XContentMapValues.extractValue("_source.ccr_stats.follower_index", hit); - assertThat(followerIndex, equalTo(expectedFollowerIndex)); - - int foundNumberOfOperationsReceived = - (int) XContentMapValues.extractValue("_source.ccr_stats.operations_received", hit); - numberOfOperationsReceived = Math.max(numberOfOperationsReceived, foundNumberOfOperationsReceived); - int foundNumberOfOperationsIndexed = - (int) XContentMapValues.extractValue("_source.ccr_stats.number_of_operations_indexed", hit); - numberOfOperationsIndexed = Math.max(numberOfOperationsIndexed, foundNumberOfOperationsIndexed); - } - - assertThat(numberOfOperationsReceived, greaterThanOrEqualTo(1)); - assertThat(numberOfOperationsIndexed, greaterThanOrEqualTo(1)); - } - - private static void verifyAutoFollowMonitoring() throws IOException { - Request request = new Request("GET", "/.monitoring-*/_search"); - request.setJsonEntity("{\"query\": {\"term\": {\"type\": \"ccr_auto_follow_stats\"}}}"); - Map response; - try { - response = toMap(client().performRequest(request)); - } catch (ResponseException e) { - throw new AssertionError("error while searching", e); - } - - int numberOfSuccessfulFollowIndices = 0; - - List hits = (List) XContentMapValues.extractValue("hits.hits", response); - assertThat(hits.size(), greaterThanOrEqualTo(1)); - - for (int i = 0; i < hits.size(); i++) { - Map hit = (Map) hits.get(i); - - int foundNumberOfOperationsReceived = - (int) XContentMapValues.extractValue("_source.ccr_auto_follow_stats.number_of_successful_follow_indices", hit); - numberOfSuccessfulFollowIndices = Math.max(numberOfSuccessfulFollowIndices, foundNumberOfOperationsReceived); - } - - assertThat(numberOfSuccessfulFollowIndices, greaterThanOrEqualTo(1)); - } - - private static Map toMap(Response response) throws IOException { - return toMap(EntityUtils.toString(response.getEntity())); - } - - private static Map toMap(String response) { - return XContentHelper.convertToMap(JsonXContent.jsonXContent, response, false); - } - - private static void ensureYellow(String index) throws IOException { - Request request = new Request("GET", "/_cluster/health/" + index); - request.addParameter("wait_for_status", "yellow"); - request.addParameter("wait_for_no_relocating_shards", "true"); - request.addParameter("wait_for_no_initializing_shards", "true"); - request.addParameter("timeout", "70s"); - request.addParameter("level", "shards"); - client().performRequest(request); - } - - private RestClient buildLeaderClient() throws IOException { - assert runningAgainstLeaderCluster == false; - String leaderUrl = System.getProperty("tests.leader_host"); - int portSeparator = leaderUrl.lastIndexOf(':'); - HttpHost httpHost = new HttpHost(leaderUrl.substring(0, portSeparator), - Integer.parseInt(leaderUrl.substring(portSeparator + 1)), getProtocol()); - return buildClient(Settings.EMPTY, new HttpHost[]{httpHost}); - } - } diff --git a/x-pack/plugin/ccr/qa/src/main/java/org/elasticsearch/xpack/ccr/ESCCRRestTestCase.java b/x-pack/plugin/ccr/qa/src/main/java/org/elasticsearch/xpack/ccr/ESCCRRestTestCase.java new file mode 100644 index 00000000000..b2e300ec4be --- /dev/null +++ b/x-pack/plugin/ccr/qa/src/main/java/org/elasticsearch/xpack/ccr/ESCCRRestTestCase.java @@ -0,0 +1,245 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr; + +import org.apache.http.HttpHost; +import org.apache.http.util.EntityUtils; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.ResponseException; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.common.xcontent.support.XContentMapValues; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.test.rest.ESRestTestCase; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.hamcrest.Matchers.endsWith; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; + +public class ESCCRRestTestCase extends ESRestTestCase { + + protected final String targetCluster = System.getProperty("tests.target_cluster"); + + @Override + protected boolean preserveClusterUponCompletion() { + return true; + } + + protected static void index(String index, String id, Object... fields) throws IOException { + index(adminClient(), index, id, fields); + } + + protected static void index(RestClient client, String index, String id, Object... fields) throws IOException { + XContentBuilder document = jsonBuilder().startObject(); + for (int i = 0; i < fields.length; i += 2) { + document.field((String) fields[i], fields[i + 1]); + } + document.endObject(); + final Request request = new Request("POST", "/" + index + "/_doc/" + id); + request.setJsonEntity(Strings.toString(document)); + assertOK(client.performRequest(request)); + } + + protected static void refresh(String index) throws IOException { + assertOK(adminClient().performRequest(new Request("POST", "/" + index + "/_refresh"))); + } + + protected static void resumeFollow(String followIndex) throws IOException { + final Request request = new Request("POST", "/" + followIndex + "/_ccr/resume_follow"); + request.setJsonEntity("{\"poll_timeout\": \"10ms\"}"); + assertOK(client().performRequest(request)); + } + + protected static void followIndex(String leaderIndex, String followIndex) throws IOException { + followIndex("leader_cluster", leaderIndex, followIndex); + } + + protected static void followIndex(String leaderCluster, String leaderIndex, String followIndex) throws IOException { + followIndex(client(), leaderCluster, leaderIndex, followIndex); + } + + protected static void followIndex(RestClient client, String leaderCluster, String leaderIndex, String followIndex) throws IOException { + final Request request = new Request("PUT", "/" + followIndex + "/_ccr/follow"); + request.setJsonEntity("{\"leader_cluster\": \"" + leaderCluster + "\", \"leader_index\": \"" + leaderIndex + + "\", \"poll_timeout\": \"10ms\"}"); + assertOK(client.performRequest(request)); + } + + protected static void pauseFollow(String followIndex) throws IOException { + pauseFollow(client(), followIndex); + } + + protected static void pauseFollow(RestClient client, String followIndex) throws IOException { + assertOK(client.performRequest(new Request("POST", "/" + followIndex + "/_ccr/pause_follow"))); + } + + protected static void verifyDocuments(final String index, final int expectedNumDocs, final String query) throws IOException { + verifyDocuments(index, expectedNumDocs, query, adminClient()); + } + + protected static void verifyDocuments(final String index, + final int expectedNumDocs, + final String query, + final RestClient client) throws IOException { + final Request request = new Request("GET", "/" + index + "/_search"); + request.addParameter("size", Integer.toString(expectedNumDocs)); + request.addParameter("sort", "field:asc"); + request.addParameter("q", query); + Map response = toMap(client.performRequest(request)); + + int numDocs = (int) XContentMapValues.extractValue("hits.total", response); + assertThat(numDocs, equalTo(expectedNumDocs)); + + List hits = (List) XContentMapValues.extractValue("hits.hits", response); + assertThat(hits.size(), equalTo(expectedNumDocs)); + for (int i = 0; i < expectedNumDocs; i++) { + int value = (int) XContentMapValues.extractValue("_source.field", (Map) hits.get(i)); + assertThat(i, equalTo(value)); + } + } + + protected static void verifyCcrMonitoring(final String expectedLeaderIndex, final String expectedFollowerIndex) throws IOException { + Request request = new Request("GET", "/.monitoring-*/_search"); + request.setJsonEntity("{\"query\": {\"term\": {\"ccr_stats.leader_index\": \"" + expectedLeaderIndex + "\"}}}"); + Map response; + try { + response = toMap(adminClient().performRequest(request)); + } catch (ResponseException e) { + throw new AssertionError("error while searching", e); + } + + int numberOfOperationsReceived = 0; + int numberOfOperationsIndexed = 0; + + List hits = (List) XContentMapValues.extractValue("hits.hits", response); + assertThat(hits.size(), greaterThanOrEqualTo(1)); + + for (int i = 0; i < hits.size(); i++) { + Map hit = (Map) hits.get(i); + String leaderIndex = (String) XContentMapValues.extractValue("_source.ccr_stats.leader_index", hit); + assertThat(leaderIndex, endsWith(expectedLeaderIndex)); + + final String followerIndex = (String) XContentMapValues.extractValue("_source.ccr_stats.follower_index", hit); + assertThat(followerIndex, equalTo(expectedFollowerIndex)); + + int foundNumberOfOperationsReceived = + (int) XContentMapValues.extractValue("_source.ccr_stats.operations_received", hit); + numberOfOperationsReceived = Math.max(numberOfOperationsReceived, foundNumberOfOperationsReceived); + int foundNumberOfOperationsIndexed = + (int) XContentMapValues.extractValue("_source.ccr_stats.number_of_operations_indexed", hit); + numberOfOperationsIndexed = Math.max(numberOfOperationsIndexed, foundNumberOfOperationsIndexed); + } + + assertThat(numberOfOperationsReceived, greaterThanOrEqualTo(1)); + assertThat(numberOfOperationsIndexed, greaterThanOrEqualTo(1)); + } + + protected static void verifyAutoFollowMonitoring() throws IOException { + Request request = new Request("GET", "/.monitoring-*/_search"); + request.setJsonEntity("{\"query\": {\"term\": {\"type\": \"ccr_auto_follow_stats\"}}}"); + Map response; + try { + response = toMap(adminClient().performRequest(request)); + } catch (ResponseException e) { + throw new AssertionError("error while searching", e); + } + + int numberOfSuccessfulFollowIndices = 0; + + List hits = (List) XContentMapValues.extractValue("hits.hits", response); + assertThat(hits.size(), greaterThanOrEqualTo(1)); + + for (int i = 0; i < hits.size(); i++) { + Map hit = (Map) hits.get(i); + + int foundNumberOfOperationsReceived = + (int) XContentMapValues.extractValue("_source.ccr_auto_follow_stats.number_of_successful_follow_indices", hit); + numberOfSuccessfulFollowIndices = Math.max(numberOfSuccessfulFollowIndices, foundNumberOfOperationsReceived); + } + + assertThat(numberOfSuccessfulFollowIndices, greaterThanOrEqualTo(1)); + } + + protected static Map toMap(Response response) throws IOException { + return toMap(EntityUtils.toString(response.getEntity())); + } + + protected static Map toMap(String response) { + return XContentHelper.convertToMap(JsonXContent.jsonXContent, response, false); + } + + protected static void ensureYellow(String index) throws IOException { + Request request = new Request("GET", "/_cluster/health/" + index); + request.addParameter("wait_for_status", "yellow"); + request.addParameter("wait_for_no_relocating_shards", "true"); + request.addParameter("wait_for_no_initializing_shards", "true"); + request.addParameter("timeout", "70s"); + request.addParameter("level", "shards"); + adminClient().performRequest(request); + } + + protected int countCcrNodeTasks() throws IOException { + final Request request = new Request("GET", "/_tasks"); + request.addParameter("detailed", "true"); + Map rsp1 = toMap(adminClient().performRequest(request)); + Map nodes = (Map) rsp1.get("nodes"); + assertThat(nodes.size(), equalTo(1)); + Map node = (Map) nodes.values().iterator().next(); + Map nodeTasks = (Map) node.get("tasks"); + int numNodeTasks = 0; + for (Map.Entry entry : nodeTasks.entrySet()) { + Map nodeTask = (Map) entry.getValue(); + String action = (String) nodeTask.get("action"); + if (action.startsWith("xpack/ccr/shard_follow_task")) { + numNodeTasks++; + } + } + return numNodeTasks; + } + + protected static void createIndex(String name, Settings settings) throws IOException { + createIndex(name, settings, ""); + } + + protected static void createIndex(String name, Settings settings, String mapping) throws IOException { + final Request request = new Request("PUT", "/" + name); + request.setJsonEntity("{ \"settings\": " + Strings.toString(settings) + ", \"mappings\" : {" + mapping + "} }"); + assertOK(adminClient().performRequest(request)); + } + + protected static boolean indexExists(String index) throws IOException { + Response response = adminClient().performRequest(new Request("HEAD", "/" + index)); + return RestStatus.OK.getStatus() == response.getStatusLine().getStatusCode(); + } + + protected RestClient buildLeaderClient() throws IOException { + assert "leader".equals(targetCluster) == false; + return buildClient(System.getProperty("tests.leader_host")); + } + + protected RestClient buildMiddleClient() throws IOException { + assert "middle".equals(targetCluster) == false; + return buildClient(System.getProperty("tests.middle_host")); + } + + private RestClient buildClient(final String url) throws IOException { + int portSeparator = url.lastIndexOf(':'); + HttpHost httpHost = new HttpHost(url.substring(0, portSeparator), + Integer.parseInt(url.substring(portSeparator + 1)), getProtocol()); + return buildClient(restAdminSettings(), new HttpHost[]{httpHost}); + } + +}