Increase timeout in testFollowIndexWithConcurrentMappingChanges (#60534)
The test failed because the leader was taking a lot of CPUs to process many mapping updates. This commit reduces the mapping updates, increases timeout, and adds more debug info. Closes #59832
This commit is contained in:
parent
bf7eecf1dc
commit
ceaa28e97b
|
@ -249,38 +249,23 @@ public class IndexFollowingIT extends CcrIntegTestCase {
|
||||||
final int firstBatchNumDocs = randomIntBetween(2, 64);
|
final int firstBatchNumDocs = randomIntBetween(2, 64);
|
||||||
logger.info("Indexing [{}] docs as first batch", firstBatchNumDocs);
|
logger.info("Indexing [{}] docs as first batch", firstBatchNumDocs);
|
||||||
for (int i = 0; i < firstBatchNumDocs; i++) {
|
for (int i = 0; i < firstBatchNumDocs; i++) {
|
||||||
final String source = String.format(Locale.ROOT, "{\"f\":%d}", i);
|
leaderClient().prepareIndex("index1", "_doc").setId(Integer.toString(i)).setSource("f", i).get();
|
||||||
leaderClient().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
AtomicBoolean isRunning = new AtomicBoolean(true);
|
AtomicBoolean isRunning = new AtomicBoolean(true);
|
||||||
|
|
||||||
// Concurrently index new docs with mapping changes
|
// Concurrently index new docs with mapping changes
|
||||||
|
int numFields = between(10, 20);
|
||||||
Thread thread = new Thread(() -> {
|
Thread thread = new Thread(() -> {
|
||||||
int docID = 10000;
|
int numDocs = between(10, 200);
|
||||||
char[] chars = "abcdeghijklmnopqrstuvwxyz".toCharArray();
|
for (int i = 0; i < numDocs; i++) {
|
||||||
for (char c : chars) {
|
|
||||||
if (isRunning.get() == false) {
|
if (isRunning.get() == false) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
final String source;
|
final String field = "f-" + between(1, numFields);
|
||||||
long valueToPutInDoc = randomLongBetween(0, 50000);
|
leaderClient().prepareIndex("index1", "_doc").setSource(field, between(0, 1000)).get();
|
||||||
if (randomBoolean()) {
|
|
||||||
source = String.format(Locale.ROOT, "{\"%c\":%d}", c, valueToPutInDoc);
|
|
||||||
} else {
|
|
||||||
source = String.format(Locale.ROOT, "{\"%c\":\"%d\"}", c, valueToPutInDoc);
|
|
||||||
}
|
|
||||||
for (int i = 1; i < 10; i++) {
|
|
||||||
if (isRunning.get() == false) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
leaderClient().prepareIndex("index1", "doc", Long.toString(docID++)).setSource(source, XContentType.JSON).get();
|
|
||||||
if (rarely()) {
|
if (rarely()) {
|
||||||
leaderClient().admin().indices().prepareFlush("index1").setForce(true).get();
|
leaderClient().admin().indices().prepareFlush("index1").setWaitIfOngoing(false).setForce(false).get();
|
||||||
}
|
|
||||||
}
|
|
||||||
if (between(0, 100) < 20) {
|
|
||||||
leaderClient().admin().indices().prepareFlush("index1").setForce(false).setWaitIfOngoing(false).get();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
@ -298,16 +283,14 @@ public class IndexFollowingIT extends CcrIntegTestCase {
|
||||||
final int secondBatchNumDocs = randomIntBetween(2, 64);
|
final int secondBatchNumDocs = randomIntBetween(2, 64);
|
||||||
logger.info("Indexing [{}] docs as second batch", secondBatchNumDocs);
|
logger.info("Indexing [{}] docs as second batch", secondBatchNumDocs);
|
||||||
for (int i = firstBatchNumDocs; i < firstBatchNumDocs + secondBatchNumDocs; i++) {
|
for (int i = firstBatchNumDocs; i < firstBatchNumDocs + secondBatchNumDocs; i++) {
|
||||||
final String source = String.format(Locale.ROOT, "{\"f\":%d}", i);
|
leaderClient().prepareIndex("index1", "_doc").setId(Integer.toString(i)).setSource("f", i).get();
|
||||||
leaderClient().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get();
|
|
||||||
}
|
}
|
||||||
|
for (int i = 0; i < firstBatchNumDocs + secondBatchNumDocs; i++) {
|
||||||
for (int i = firstBatchNumDocs; i < firstBatchNumDocs + secondBatchNumDocs; i++) {
|
|
||||||
assertBusy(assertExpectedDocumentRunnable(i), 1, TimeUnit.MINUTES);
|
assertBusy(assertExpectedDocumentRunnable(i), 1, TimeUnit.MINUTES);
|
||||||
}
|
}
|
||||||
|
|
||||||
isRunning.set(false);
|
isRunning.set(false);
|
||||||
thread.join();
|
thread.join();
|
||||||
|
assertIndexFullyReplicatedToFollower("index1", "index2");
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testFollowIndexWithoutWaitForComplete() throws Exception {
|
public void testFollowIndexWithoutWaitForComplete() throws Exception {
|
||||||
|
|
|
@ -10,6 +10,7 @@ import org.apache.lucene.store.AlreadyClosedException;
|
||||||
import org.elasticsearch.action.ActionListener;
|
import org.elasticsearch.action.ActionListener;
|
||||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
|
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
|
||||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
||||||
|
import org.elasticsearch.action.admin.cluster.node.hotthreads.NodeHotThreads;
|
||||||
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction;
|
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction;
|
||||||
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
|
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
|
||||||
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
|
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
|
||||||
|
@ -365,7 +366,7 @@ public abstract class CcrIntegTestCase extends ESTestCase {
|
||||||
|
|
||||||
protected final ClusterHealthStatus ensureFollowerGreen(boolean waitForNoInitializingShards, String... indices) {
|
protected final ClusterHealthStatus ensureFollowerGreen(boolean waitForNoInitializingShards, String... indices) {
|
||||||
logger.info("ensure green follower indices {}", Arrays.toString(indices));
|
logger.info("ensure green follower indices {}", Arrays.toString(indices));
|
||||||
return ensureColor(clusterGroup.followerCluster, ClusterHealthStatus.GREEN, TimeValue.timeValueSeconds(30),
|
return ensureColor(clusterGroup.followerCluster, ClusterHealthStatus.GREEN, TimeValue.timeValueSeconds(60),
|
||||||
waitForNoInitializingShards, indices);
|
waitForNoInitializingShards, indices);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -387,10 +388,21 @@ public abstract class CcrIntegTestCase extends ESTestCase {
|
||||||
|
|
||||||
ClusterHealthResponse actionGet = testCluster.client().admin().cluster().health(healthRequest).actionGet();
|
ClusterHealthResponse actionGet = testCluster.client().admin().cluster().health(healthRequest).actionGet();
|
||||||
if (actionGet.isTimedOut()) {
|
if (actionGet.isTimedOut()) {
|
||||||
logger.info("{} timed out, cluster state:\n{}\n{}",
|
logger.info("{} timed out: " +
|
||||||
|
"\nleader cluster state:\n{}" +
|
||||||
|
"\nleader cluster hot threads:\n{}" +
|
||||||
|
"\nleader cluster tasks:\n{}" +
|
||||||
|
"\nfollower cluster state:\n{}" +
|
||||||
|
"\nfollower cluster hot threads:\n{}" +
|
||||||
|
"\nfollower cluster tasks:\n{}",
|
||||||
method,
|
method,
|
||||||
testCluster.client().admin().cluster().prepareState().get().getState(),
|
leaderClient().admin().cluster().prepareState().get().getState(),
|
||||||
testCluster.client().admin().cluster().preparePendingClusterTasks().get());
|
getHotThreads(leaderClient()),
|
||||||
|
leaderClient().admin().cluster().preparePendingClusterTasks().get(),
|
||||||
|
followerClient().admin().cluster().prepareState().get().getState(),
|
||||||
|
getHotThreads(followerClient()),
|
||||||
|
followerClient().admin().cluster().preparePendingClusterTasks().get()
|
||||||
|
);
|
||||||
fail("timed out waiting for " + color + " state");
|
fail("timed out waiting for " + color + " state");
|
||||||
}
|
}
|
||||||
assertThat("Expected at least " + clusterHealthStatus + " but got " + actionGet.getStatus(),
|
assertThat("Expected at least " + clusterHealthStatus + " but got " + actionGet.getStatus(),
|
||||||
|
@ -399,6 +411,11 @@ public abstract class CcrIntegTestCase extends ESTestCase {
|
||||||
return actionGet.getStatus();
|
return actionGet.getStatus();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static String getHotThreads(Client client) {
|
||||||
|
return client.admin().cluster().prepareNodesHotThreads().setThreads(99999).setIgnoreIdleThreads(false)
|
||||||
|
.get().getNodes().stream().map(NodeHotThreads::getHotThreads).collect(Collectors.joining("\n"));
|
||||||
|
}
|
||||||
|
|
||||||
protected final Index resolveLeaderIndex(String index) {
|
protected final Index resolveLeaderIndex(String index) {
|
||||||
GetIndexResponse getIndexResponse = leaderClient().admin().indices().prepareGetIndex().setIndices(index).get();
|
GetIndexResponse getIndexResponse = leaderClient().admin().indices().prepareGetIndex().setIndices(index).get();
|
||||||
assertTrue("index " + index + " not found", getIndexResponse.getSettings().containsKey(index));
|
assertTrue("index " + index + " not found", getIndexResponse.getSettings().containsKey(index));
|
||||||
|
|
Loading…
Reference in New Issue