diff --git a/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java b/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java index 06a4c4a3b55..5b032118bc0 100644 --- a/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java @@ -249,38 +249,23 @@ public class IndexFollowingIT extends CcrIntegTestCase { final int firstBatchNumDocs = randomIntBetween(2, 64); logger.info("Indexing [{}] docs as first batch", firstBatchNumDocs); for (int i = 0; i < firstBatchNumDocs; i++) { - final String source = String.format(Locale.ROOT, "{\"f\":%d}", i); - leaderClient().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get(); + leaderClient().prepareIndex("index1", "_doc").setId(Integer.toString(i)).setSource("f", i).get(); } AtomicBoolean isRunning = new AtomicBoolean(true); // Concurrently index new docs with mapping changes + int numFields = between(10, 20); Thread thread = new Thread(() -> { - int docID = 10000; - char[] chars = "abcdeghijklmnopqrstuvwxyz".toCharArray(); - for (char c : chars) { + int numDocs = between(10, 200); + for (int i = 0; i < numDocs; i++) { if (isRunning.get() == false) { break; } - final String source; - long valueToPutInDoc = randomLongBetween(0, 50000); - if (randomBoolean()) { - source = String.format(Locale.ROOT, "{\"%c\":%d}", c, valueToPutInDoc); - } else { - source = String.format(Locale.ROOT, "{\"%c\":\"%d\"}", c, valueToPutInDoc); - } - for (int i = 1; i < 10; i++) { - if (isRunning.get() == false) { - break; - } - leaderClient().prepareIndex("index1", "doc", Long.toString(docID++)).setSource(source, XContentType.JSON).get(); - if (rarely()) { - leaderClient().admin().indices().prepareFlush("index1").setForce(true).get(); - } - } - if (between(0, 100) < 20) { - leaderClient().admin().indices().prepareFlush("index1").setForce(false).setWaitIfOngoing(false).get(); + final String field = "f-" + between(1, numFields); + leaderClient().prepareIndex("index1", "_doc").setSource(field, between(0, 1000)).get(); + if (rarely()) { + leaderClient().admin().indices().prepareFlush("index1").setWaitIfOngoing(false).setForce(false).get(); } } }); @@ -298,16 +283,14 @@ public class IndexFollowingIT extends CcrIntegTestCase { final int secondBatchNumDocs = randomIntBetween(2, 64); logger.info("Indexing [{}] docs as second batch", secondBatchNumDocs); for (int i = firstBatchNumDocs; i < firstBatchNumDocs + secondBatchNumDocs; i++) { - final String source = String.format(Locale.ROOT, "{\"f\":%d}", i); - leaderClient().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get(); + leaderClient().prepareIndex("index1", "_doc").setId(Integer.toString(i)).setSource("f", i).get(); } - - for (int i = firstBatchNumDocs; i < firstBatchNumDocs + secondBatchNumDocs; i++) { + for (int i = 0; i < firstBatchNumDocs + secondBatchNumDocs; i++) { assertBusy(assertExpectedDocumentRunnable(i), 1, TimeUnit.MINUTES); } - isRunning.set(false); thread.join(); + assertIndexFullyReplicatedToFollower("index1", "index2"); } public void testFollowIndexWithoutWaitForComplete() throws Exception { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java index 980deb05de3..8fcc4894d1b 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java @@ -10,6 +10,7 @@ import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; +import org.elasticsearch.action.admin.cluster.node.hotthreads.NodeHotThreads; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; @@ -365,7 +366,7 @@ public abstract class CcrIntegTestCase extends ESTestCase { protected final ClusterHealthStatus ensureFollowerGreen(boolean waitForNoInitializingShards, String... indices) { logger.info("ensure green follower indices {}", Arrays.toString(indices)); - return ensureColor(clusterGroup.followerCluster, ClusterHealthStatus.GREEN, TimeValue.timeValueSeconds(30), + return ensureColor(clusterGroup.followerCluster, ClusterHealthStatus.GREEN, TimeValue.timeValueSeconds(60), waitForNoInitializingShards, indices); } @@ -387,10 +388,21 @@ public abstract class CcrIntegTestCase extends ESTestCase { ClusterHealthResponse actionGet = testCluster.client().admin().cluster().health(healthRequest).actionGet(); if (actionGet.isTimedOut()) { - logger.info("{} timed out, cluster state:\n{}\n{}", + logger.info("{} timed out: " + + "\nleader cluster state:\n{}" + + "\nleader cluster hot threads:\n{}" + + "\nleader cluster tasks:\n{}" + + "\nfollower cluster state:\n{}" + + "\nfollower cluster hot threads:\n{}" + + "\nfollower cluster tasks:\n{}", method, - testCluster.client().admin().cluster().prepareState().get().getState(), - testCluster.client().admin().cluster().preparePendingClusterTasks().get()); + leaderClient().admin().cluster().prepareState().get().getState(), + getHotThreads(leaderClient()), + leaderClient().admin().cluster().preparePendingClusterTasks().get(), + followerClient().admin().cluster().prepareState().get().getState(), + getHotThreads(followerClient()), + followerClient().admin().cluster().preparePendingClusterTasks().get() + ); fail("timed out waiting for " + color + " state"); } assertThat("Expected at least " + clusterHealthStatus + " but got " + actionGet.getStatus(), @@ -399,6 +411,11 @@ public abstract class CcrIntegTestCase extends ESTestCase { return actionGet.getStatus(); } + static String getHotThreads(Client client) { + return client.admin().cluster().prepareNodesHotThreads().setThreads(99999).setIgnoreIdleThreads(false) + .get().getNodes().stream().map(NodeHotThreads::getHotThreads).collect(Collectors.joining("\n")); + } + protected final Index resolveLeaderIndex(String index) { GetIndexResponse getIndexResponse = leaderClient().admin().indices().prepareGetIndex().setIndices(index).get(); assertTrue("index " + index + " not found", getIndexResponse.getSettings().containsKey(index));