[CCR] move tests that modify test cluster from the main test class to

a dedicated class.
This commit is contained in:
Martijn van Groningen 2018-10-29 13:42:47 +01:00
parent 69fe9a1bf1
commit b2daaf15d1
No known key found for this signature in database
GPG Key ID: AB236F4FCF2AF12A
3 changed files with 296 additions and 263 deletions

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction;
@ -14,6 +15,8 @@ import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.analysis.common.CommonAnalysisPlugin;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
@ -35,11 +38,17 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardTestCase;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.store.IndicesStore;
import org.elasticsearch.license.LicenseService;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESTestCase;
@ -50,6 +59,7 @@ import org.elasticsearch.test.TestCluster;
import org.elasticsearch.test.discovery.TestZenDiscovery;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ccr.LocalStateCcr;
import org.elasticsearch.xpack.ccr.index.engine.FollowingEngine;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
@ -69,6 +79,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
@ -89,6 +100,8 @@ public abstract class CcrIntegTestCase extends ESTestCase {
@Before
public final void startClusters() throws Exception {
if (clusterGroup != null && reuseClusters()) {
clusterGroup.leaderCluster.ensureAtMostNumDataNodes(numberOfNodesPerCluster());
clusterGroup.followerCluster.ensureAtMostNumDataNodes(numberOfNodesPerCluster());
return;
}
@ -373,6 +386,90 @@ public abstract class CcrIntegTestCase extends ESTestCase {
return request;
}
protected void assertSameDocCount(String leaderIndex, String followerIndex) throws Exception {
refresh(leaderClient(), leaderIndex);
SearchRequest request1 = new SearchRequest(leaderIndex);
request1.source(new SearchSourceBuilder().size(0));
SearchResponse response1 = leaderClient().search(request1).actionGet();
assertBusy(() -> {
refresh(followerClient(), followerIndex);
SearchRequest request2 = new SearchRequest(followerIndex);
request2.source(new SearchSourceBuilder().size(0));
SearchResponse response2 = followerClient().search(request2).actionGet();
assertThat(response2.getHits().getTotalHits(), equalTo(response1.getHits().getTotalHits()));
}, 60, TimeUnit.SECONDS);
}
protected void atLeastDocsIndexed(Client client, String index, long numDocsReplicated) throws InterruptedException {
logger.info("waiting for at least [{}] documents to be indexed into index [{}]", numDocsReplicated, index);
awaitBusy(() -> {
refresh(client, index);
SearchRequest request = new SearchRequest(index);
request.source(new SearchSourceBuilder().size(0));
SearchResponse response = client.search(request).actionGet();
return response.getHits().getTotalHits() >= numDocsReplicated;
}, 60, TimeUnit.SECONDS);
}
protected void assertMaxSeqNoOfUpdatesIsTransferred(Index leaderIndex, Index followerIndex, int numberOfShards) throws Exception {
assertBusy(() -> {
long[] msuOnLeader = new long[numberOfShards];
for (int i = 0; i < msuOnLeader.length; i++) {
msuOnLeader[i] = SequenceNumbers.UNASSIGNED_SEQ_NO;
}
Set<String> leaderNodes = getLeaderCluster().nodesInclude(leaderIndex.getName());
for (String leaderNode : leaderNodes) {
IndicesService indicesService = getLeaderCluster().getInstance(IndicesService.class, leaderNode);
for (int i = 0; i < numberOfShards; i++) {
IndexShard shard = indicesService.getShardOrNull(new ShardId(leaderIndex, i));
if (shard != null) {
try {
msuOnLeader[i] = SequenceNumbers.max(msuOnLeader[i], shard.getMaxSeqNoOfUpdatesOrDeletes());
} catch (AlreadyClosedException ignored) {
return;
}
}
}
}
Set<String> followerNodes = getFollowerCluster().nodesInclude(followerIndex.getName());
for (String followerNode : followerNodes) {
IndicesService indicesService = getFollowerCluster().getInstance(IndicesService.class, followerNode);
for (int i = 0; i < numberOfShards; i++) {
IndexShard shard = indicesService.getShardOrNull(new ShardId(leaderIndex, i));
if (shard != null) {
try {
assertThat(shard.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(msuOnLeader[i]));
} catch (AlreadyClosedException ignored) {
}
}
}
}
});
}
protected void assertTotalNumberOfOptimizedIndexing(Index followerIndex, int numberOfShards, long expectedTotal) throws Exception {
assertBusy(() -> {
long[] numOfOptimizedOps = new long[numberOfShards];
for (int shardId = 0; shardId < numberOfShards; shardId++) {
for (String node : getFollowerCluster().nodesInclude(followerIndex.getName())) {
IndicesService indicesService = getFollowerCluster().getInstance(IndicesService.class, node);
IndexShard shard = indicesService.getShardOrNull(new ShardId(followerIndex, shardId));
if (shard != null && shard.routingEntry().primary()) {
try {
FollowingEngine engine = ((FollowingEngine) IndexShardTestCase.getEngine(shard));
numOfOptimizedOps[shardId] = engine.getNumberOfOptimizedIndexing();
} catch (AlreadyClosedException e) {
throw new AssertionError(e); // causes assertBusy to retry
}
}
}
}
assertThat(Arrays.stream(numOfOptimizedOps).sum(), equalTo(expectedTotal));
});
}
static void removeCCRRelatedMetadataFromClusterState(ClusterService clusterService) throws Exception {
CountDownLatch latch = new CountDownLatch(1);
clusterService.submitStateUpdateTask("remove-ccr-related-metadata", new ClusterStateUpdateTask() {

View File

@ -0,0 +1,199 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.xpack.CcrIntegTestCase;
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
import java.util.Locale;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static java.util.Collections.singletonMap;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
public class FollowerFailOverIT extends CcrIntegTestCase {
@Override
protected boolean reuseClusters() {
return false;
}
public void testFailOverOnFollower() throws Exception {
int numberOfReplicas = between(1, 2);
getFollowerCluster().startMasterOnlyNode();
getFollowerCluster().ensureAtLeastNumDataNodes(numberOfReplicas + between(1, 2));
String leaderIndexSettings = getIndexSettings(1, numberOfReplicas,
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
assertAcked(leaderClient().admin().indices().prepareCreate("leader-index").setSource(leaderIndexSettings, XContentType.JSON));
AtomicBoolean stopped = new AtomicBoolean();
Thread[] threads = new Thread[between(1, 8)];
AtomicInteger docID = new AtomicInteger();
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(() -> {
while (stopped.get() == false) {
try {
if (frequently()) {
String id = Integer.toString(frequently() ? docID.incrementAndGet() : between(0, 10)); // sometimes update
leaderClient().prepareIndex("leader-index", "doc", id).setSource("{\"f\":" + id + "}", XContentType.JSON).get();
} else {
String id = Integer.toString(between(0, docID.get()));
leaderClient().prepareDelete("leader-index", "doc", id).get();
}
} catch (NodeClosedException ignored) {
}
}
});
threads[i].start();
}
PutFollowAction.Request follow = putFollow("leader-index", "follower-index");
follow.getFollowRequest().setMaxReadRequestOperationCount(randomIntBetween(32, 2048));
follow.getFollowRequest().setMaxReadRequestSize(new ByteSizeValue(randomIntBetween(1, 4096), ByteSizeUnit.KB));
follow.getFollowRequest().setMaxOutstandingReadRequests(randomIntBetween(1, 10));
follow.getFollowRequest().setMaxWriteRequestOperationCount(randomIntBetween(32, 2048));
follow.getFollowRequest().setMaxWriteRequestSize(new ByteSizeValue(randomIntBetween(1, 4096), ByteSizeUnit.KB));
follow.getFollowRequest().setMaxOutstandingWriteRequests(randomIntBetween(1, 10));
followerClient().execute(PutFollowAction.INSTANCE, follow).get();
ensureFollowerGreen("follower-index");
atLeastDocsIndexed(followerClient(), "follower-index", between(20, 60));
final ClusterState clusterState = getFollowerCluster().clusterService().state();
for (ShardRouting shardRouting : clusterState.routingTable().allShards("follower-index")) {
if (shardRouting.primary()) {
DiscoveryNode assignedNode = clusterState.nodes().get(shardRouting.currentNodeId());
getFollowerCluster().restartNode(assignedNode.getName(), new InternalTestCluster.RestartCallback());
break;
}
}
ensureFollowerGreen("follower-index");
atLeastDocsIndexed(followerClient(), "follower-index", between(80, 150));
stopped.set(true);
for (Thread thread : threads) {
thread.join();
}
assertSameDocCount("leader-index", "follower-index");
pauseFollow("follower-index");
}
public void testFollowIndexAndCloseNode() throws Exception {
getFollowerCluster().ensureAtLeastNumDataNodes(3);
String leaderIndexSettings = getIndexSettings(3, 1, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
ensureLeaderGreen("index1");
AtomicBoolean run = new AtomicBoolean(true);
Thread thread = new Thread(() -> {
int counter = 0;
while (run.get()) {
final String source = String.format(Locale.ROOT, "{\"f\":%d}", counter++);
try {
leaderClient().prepareIndex("index1", "doc")
.setSource(source, XContentType.JSON)
.setTimeout(TimeValue.timeValueSeconds(1))
.get();
} catch (Exception e) {
logger.error("Error while indexing into leader index", e);
}
}
});
thread.start();
PutFollowAction.Request followRequest = putFollow("index1", "index2");
followRequest.getFollowRequest().setMaxReadRequestOperationCount(randomIntBetween(32, 2048));
followRequest.getFollowRequest().setMaxReadRequestSize(new ByteSizeValue(randomIntBetween(1, 4096), ByteSizeUnit.KB));
followRequest.getFollowRequest().setMaxOutstandingReadRequests(randomIntBetween(1, 10));
followRequest.getFollowRequest().setMaxWriteRequestOperationCount(randomIntBetween(32, 2048));
followRequest.getFollowRequest().setMaxWriteRequestSize(new ByteSizeValue(randomIntBetween(1, 4096), ByteSizeUnit.KB));
followRequest.getFollowRequest().setMaxOutstandingWriteRequests(randomIntBetween(1, 10));
followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
long maxNumDocsReplicated = Math.min(1000, randomLongBetween(followRequest.getFollowRequest().getMaxReadRequestOperationCount(),
followRequest.getFollowRequest().getMaxReadRequestOperationCount() * 10));
long minNumDocsReplicated = maxNumDocsReplicated / 3L;
logger.info("waiting for at least [{}] documents to be indexed and then stop a random data node", minNumDocsReplicated);
atLeastDocsIndexed(followerClient(), "index2", minNumDocsReplicated);
getFollowerCluster().stopRandomNonMasterNode();
logger.info("waiting for at least [{}] documents to be indexed", maxNumDocsReplicated);
atLeastDocsIndexed(followerClient(), "index2", maxNumDocsReplicated);
run.set(false);
thread.join();
assertSameDocCount("index1", "index2");
pauseFollow("index2");
assertMaxSeqNoOfUpdatesIsTransferred(resolveLeaderIndex("index1"), resolveFollowerIndex("index2"), 3);
}
public void testAddNewReplicasOnFollower() throws Exception {
int numberOfReplicas = between(0, 1);
String leaderIndexSettings = getIndexSettings(1, numberOfReplicas,
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
assertAcked(leaderClient().admin().indices().prepareCreate("leader-index").setSource(leaderIndexSettings, XContentType.JSON));
PutFollowAction.Request follow = putFollow("leader-index", "follower-index");
followerClient().execute(PutFollowAction.INSTANCE, follow).get();
getFollowerCluster().ensureAtLeastNumDataNodes(numberOfReplicas + between(2, 3));
ensureFollowerGreen("follower-index");
AtomicBoolean stopped = new AtomicBoolean();
AtomicInteger docID = new AtomicInteger();
boolean appendOnly = randomBoolean();
Thread indexingOnLeader = new Thread(() -> {
while (stopped.get() == false) {
try {
if (appendOnly) {
String id = Integer.toString(docID.incrementAndGet());
leaderClient().prepareIndex("leader-index", "doc", id).setSource("{\"f\":" + id + "}", XContentType.JSON).get();
} else if (frequently()) {
String id = Integer.toString(frequently() ? docID.incrementAndGet() : between(0, 100));
leaderClient().prepareIndex("leader-index", "doc", id).setSource("{\"f\":" + id + "}", XContentType.JSON).get();
} else {
String id = Integer.toString(between(0, docID.get()));
leaderClient().prepareDelete("leader-index", "doc", id).get();
}
} catch (Exception ex) {
throw new AssertionError(ex);
}
}
});
indexingOnLeader.start();
Thread flushingOnFollower = new Thread(() -> {
while (stopped.get() == false) {
try {
if (rarely()) {
followerClient().admin().indices().prepareFlush("follower-index").get();
}
if (rarely()) {
followerClient().admin().indices().prepareRefresh("follower-index").get();
}
} catch (Exception ex) {
throw new AssertionError(ex);
}
}
});
flushingOnFollower.start();
atLeastDocsIndexed(followerClient(), "follower-index", 50);
followerClient().admin().indices().prepareUpdateSettings("follower-index")
.setSettings(Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas + 1).build()).get();
ensureFollowerGreen("follower-index");
atLeastDocsIndexed(followerClient(), "follower-index", 100);
stopped.set(true);
flushingOnFollower.join();
indexingOnLeader.join();
assertSameDocCount("leader-index", "follower-index");
pauseFollow("follower-index");
}
}

View File

@ -6,7 +6,6 @@
package org.elasticsearch.xpack.ccr;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
@ -20,15 +19,10 @@ import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.CheckedRunnable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
@ -38,22 +32,13 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardTestCase;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.xpack.CcrIntegTestCase;
import org.elasticsearch.xpack.ccr.action.ShardFollowTask;
import org.elasticsearch.xpack.ccr.index.engine.FollowingEngine;
import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction;
import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction.StatsRequest;
@ -65,7 +50,6 @@ import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction;
import org.elasticsearch.xpack.core.ccr.action.UnfollowAction;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@ -73,10 +57,8 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static java.util.Collections.singletonMap;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
@ -269,54 +251,6 @@ public class IndexFollowingIT extends CcrIntegTestCase {
assertMaxSeqNoOfUpdatesIsTransferred(resolveLeaderIndex("index1"), resolveFollowerIndex("index2"), numberOfShards);
}
public void testFollowIndexAndCloseNode() throws Exception {
getFollowerCluster().ensureAtLeastNumDataNodes(3);
String leaderIndexSettings = getIndexSettings(3, 1, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
ensureLeaderGreen("index1");
AtomicBoolean run = new AtomicBoolean(true);
Thread thread = new Thread(() -> {
int counter = 0;
while (run.get()) {
final String source = String.format(Locale.ROOT, "{\"f\":%d}", counter++);
try {
leaderClient().prepareIndex("index1", "doc")
.setSource(source, XContentType.JSON)
.setTimeout(TimeValue.timeValueSeconds(1))
.get();
} catch (Exception e) {
logger.error("Error while indexing into leader index", e);
}
}
});
thread.start();
PutFollowAction.Request followRequest = putFollow("index1", "index2");
followRequest.getFollowRequest().setMaxReadRequestOperationCount(randomIntBetween(32, 2048));
followRequest.getFollowRequest().setMaxReadRequestSize(new ByteSizeValue(randomIntBetween(1, 4096), ByteSizeUnit.KB));
followRequest.getFollowRequest().setMaxOutstandingReadRequests(randomIntBetween(1, 10));
followRequest.getFollowRequest().setMaxWriteRequestOperationCount(randomIntBetween(32, 2048));
followRequest.getFollowRequest().setMaxWriteRequestSize(new ByteSizeValue(randomIntBetween(1, 4096), ByteSizeUnit.KB));
followRequest.getFollowRequest().setMaxOutstandingWriteRequests(randomIntBetween(1, 10));
followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
long maxNumDocsReplicated = Math.min(1000, randomLongBetween(followRequest.getFollowRequest().getMaxReadRequestOperationCount(),
followRequest.getFollowRequest().getMaxReadRequestOperationCount() * 10));
long minNumDocsReplicated = maxNumDocsReplicated / 3L;
logger.info("waiting for at least [{}] documents to be indexed and then stop a random data node", minNumDocsReplicated);
atLeastDocsIndexed(followerClient(), "index2", minNumDocsReplicated);
getFollowerCluster().stopRandomNonMasterNode();
logger.info("waiting for at least [{}] documents to be indexed", maxNumDocsReplicated);
atLeastDocsIndexed(followerClient(), "index2", maxNumDocsReplicated);
run.set(false);
thread.join();
assertSameDocCount("index1", "index2");
pauseFollow("index2");
assertMaxSeqNoOfUpdatesIsTransferred(resolveLeaderIndex("index1"), resolveFollowerIndex("index2"), 3);
}
public void testFollowIndexWithNestedField() throws Exception {
final String leaderIndexSettings =
getIndexSettingsWithNestedMapping(1, between(0, 1), singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
@ -587,61 +521,6 @@ public class IndexFollowingIT extends CcrIntegTestCase {
assertThat(followerClient().prepareSearch("index2").get().getHits().getTotalHits(), equalTo(2L));
}
public void testFailOverOnFollower() throws Exception {
int numberOfReplicas = between(1, 2);
getFollowerCluster().startMasterOnlyNode();
getFollowerCluster().ensureAtLeastNumDataNodes(numberOfReplicas + between(1, 2));
String leaderIndexSettings = getIndexSettings(1, numberOfReplicas,
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
assertAcked(leaderClient().admin().indices().prepareCreate("leader-index").setSource(leaderIndexSettings, XContentType.JSON));
AtomicBoolean stopped = new AtomicBoolean();
Thread[] threads = new Thread[between(1, 8)];
AtomicInteger docID = new AtomicInteger();
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(() -> {
while (stopped.get() == false) {
try {
if (frequently()) {
String id = Integer.toString(frequently() ? docID.incrementAndGet() : between(0, 10)); // sometimes update
leaderClient().prepareIndex("leader-index", "doc", id).setSource("{\"f\":" + id + "}", XContentType.JSON).get();
} else {
String id = Integer.toString(between(0, docID.get()));
leaderClient().prepareDelete("leader-index", "doc", id).get();
}
} catch (NodeClosedException ignored) {
}
}
});
threads[i].start();
}
PutFollowAction.Request follow = putFollow("leader-index", "follower-index");
follow.getFollowRequest().setMaxReadRequestOperationCount(randomIntBetween(32, 2048));
follow.getFollowRequest().setMaxReadRequestSize(new ByteSizeValue(randomIntBetween(1, 4096), ByteSizeUnit.KB));
follow.getFollowRequest().setMaxOutstandingReadRequests(randomIntBetween(1, 10));
follow.getFollowRequest().setMaxWriteRequestOperationCount(randomIntBetween(32, 2048));
follow.getFollowRequest().setMaxWriteRequestSize(new ByteSizeValue(randomIntBetween(1, 4096), ByteSizeUnit.KB));
follow.getFollowRequest().setMaxOutstandingWriteRequests(randomIntBetween(1, 10));
followerClient().execute(PutFollowAction.INSTANCE, follow).get();
ensureFollowerGreen("follower-index");
atLeastDocsIndexed(followerClient(), "follower-index", between(20, 60));
final ClusterState clusterState = getFollowerCluster().clusterService().state();
for (ShardRouting shardRouting : clusterState.routingTable().allShards("follower-index")) {
if (shardRouting.primary()) {
DiscoveryNode assignedNode = clusterState.nodes().get(shardRouting.currentNodeId());
getFollowerCluster().restartNode(assignedNode.getName(), new InternalTestCluster.RestartCallback());
break;
}
}
ensureFollowerGreen("follower-index");
atLeastDocsIndexed(followerClient(), "follower-index", between(80, 150));
stopped.set(true);
for (Thread thread : threads) {
thread.join();
}
assertSameDocCount("leader-index", "follower-index");
pauseFollow("follower-index");
}
public void testUnknownClusterAlias() throws Exception {
String leaderIndexSettings = getIndexSettings(1, 0,
Collections.singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
@ -661,64 +540,6 @@ public class IndexFollowingIT extends CcrIntegTestCase {
assertThat(e.getMessage(), equalTo("unknown cluster alias [another_cluster]"));
}
public void testAddNewReplicasOnFollower() throws Exception {
int numberOfReplicas = between(0, 1);
String leaderIndexSettings = getIndexSettings(1, numberOfReplicas,
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
assertAcked(leaderClient().admin().indices().prepareCreate("leader-index").setSource(leaderIndexSettings, XContentType.JSON));
PutFollowAction.Request follow = putFollow("leader-index", "follower-index");
followerClient().execute(PutFollowAction.INSTANCE, follow).get();
getFollowerCluster().ensureAtLeastNumDataNodes(numberOfReplicas + between(2, 3));
ensureFollowerGreen("follower-index");
AtomicBoolean stopped = new AtomicBoolean();
AtomicInteger docID = new AtomicInteger();
boolean appendOnly = randomBoolean();
Thread indexingOnLeader = new Thread(() -> {
while (stopped.get() == false) {
try {
if (appendOnly) {
String id = Integer.toString(docID.incrementAndGet());
leaderClient().prepareIndex("leader-index", "doc", id).setSource("{\"f\":" + id + "}", XContentType.JSON).get();
} else if (frequently()) {
String id = Integer.toString(frequently() ? docID.incrementAndGet() : between(0, 100));
leaderClient().prepareIndex("leader-index", "doc", id).setSource("{\"f\":" + id + "}", XContentType.JSON).get();
} else {
String id = Integer.toString(between(0, docID.get()));
leaderClient().prepareDelete("leader-index", "doc", id).get();
}
} catch (Exception ex) {
throw new AssertionError(ex);
}
}
});
indexingOnLeader.start();
Thread flushingOnFollower = new Thread(() -> {
while (stopped.get() == false) {
try {
if (rarely()) {
followerClient().admin().indices().prepareFlush("follower-index").get();
}
if (rarely()) {
followerClient().admin().indices().prepareRefresh("follower-index").get();
}
} catch (Exception ex) {
throw new AssertionError(ex);
}
}
});
flushingOnFollower.start();
atLeastDocsIndexed(followerClient(), "follower-index", 50);
followerClient().admin().indices().prepareUpdateSettings("follower-index")
.setSettings(Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas + 1).build()).get();
ensureFollowerGreen("follower-index");
atLeastDocsIndexed(followerClient(), "follower-index", 100);
stopped.set(true);
flushingOnFollower.join();
indexingOnLeader.join();
assertSameDocCount("leader-index", "follower-index");
pauseFollow("follower-index");
}
private CheckedRunnable<Exception> assertTask(final int numberOfPrimaryShards, final Map<ShardId, Long> numDocsPerShard) {
return () -> {
final ClusterState clusterState = followerClient().admin().cluster().prepareState().get().getState();
@ -817,88 +638,4 @@ public class IndexFollowingIT extends CcrIntegTestCase {
return settings;
}
private void atLeastDocsIndexed(Client client, String index, long numDocsReplicated) throws InterruptedException {
logger.info("waiting for at least [{}] documents to be indexed into index [{}]", numDocsReplicated, index);
awaitBusy(() -> {
refresh(client, index);
SearchRequest request = new SearchRequest(index);
request.source(new SearchSourceBuilder().size(0));
SearchResponse response = client.search(request).actionGet();
return response.getHits().getTotalHits() >= numDocsReplicated;
}, 60, TimeUnit.SECONDS);
}
private void assertSameDocCount(String leaderIndex, String followerIndex) throws Exception {
refresh(leaderClient(), leaderIndex);
SearchRequest request1 = new SearchRequest(leaderIndex);
request1.source(new SearchSourceBuilder().size(0));
SearchResponse response1 = leaderClient().search(request1).actionGet();
assertBusy(() -> {
refresh(followerClient(), followerIndex);
SearchRequest request2 = new SearchRequest(followerIndex);
request2.source(new SearchSourceBuilder().size(0));
SearchResponse response2 = followerClient().search(request2).actionGet();
assertThat(response2.getHits().getTotalHits(), equalTo(response1.getHits().getTotalHits()));
}, 60, TimeUnit.SECONDS);
}
private void assertMaxSeqNoOfUpdatesIsTransferred(Index leaderIndex, Index followerIndex, int numberOfShards) throws Exception {
assertBusy(() -> {
long[] msuOnLeader = new long[numberOfShards];
for (int i = 0; i < msuOnLeader.length; i++) {
msuOnLeader[i] = SequenceNumbers.UNASSIGNED_SEQ_NO;
}
Set<String> leaderNodes = getLeaderCluster().nodesInclude(leaderIndex.getName());
for (String leaderNode : leaderNodes) {
IndicesService indicesService = getLeaderCluster().getInstance(IndicesService.class, leaderNode);
for (int i = 0; i < numberOfShards; i++) {
IndexShard shard = indicesService.getShardOrNull(new ShardId(leaderIndex, i));
if (shard != null) {
try {
msuOnLeader[i] = SequenceNumbers.max(msuOnLeader[i], shard.getMaxSeqNoOfUpdatesOrDeletes());
} catch (AlreadyClosedException ignored) {
return;
}
}
}
}
Set<String> followerNodes = getFollowerCluster().nodesInclude(followerIndex.getName());
for (String followerNode : followerNodes) {
IndicesService indicesService = getFollowerCluster().getInstance(IndicesService.class, followerNode);
for (int i = 0; i < numberOfShards; i++) {
IndexShard shard = indicesService.getShardOrNull(new ShardId(leaderIndex, i));
if (shard != null) {
try {
assertThat(shard.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(msuOnLeader[i]));
} catch (AlreadyClosedException ignored) {
}
}
}
}
});
}
private void assertTotalNumberOfOptimizedIndexing(Index followerIndex, int numberOfShards, long expectedTotal) throws Exception {
assertBusy(() -> {
long[] numOfOptimizedOps = new long[numberOfShards];
for (int shardId = 0; shardId < numberOfShards; shardId++) {
for (String node : getFollowerCluster().nodesInclude(followerIndex.getName())) {
IndicesService indicesService = getFollowerCluster().getInstance(IndicesService.class, node);
IndexShard shard = indicesService.getShardOrNull(new ShardId(followerIndex, shardId));
if (shard != null && shard.routingEntry().primary()) {
try {
FollowingEngine engine = ((FollowingEngine) IndexShardTestCase.getEngine(shard));
numOfOptimizedOps[shardId] = engine.getNumberOfOptimizedIndexing();
} catch (AlreadyClosedException e) {
throw new AssertionError(e); // causes assertBusy to retry
}
}
}
}
assertThat(Arrays.stream(numOfOptimizedOps).sum(), equalTo(expectedTotal));
});
}
}