mirror of https://github.com/apache/lucene.git
SOLR-13860: Enable back TestTlogReplica (#965)
* Wait for leader in testBasicLeaderElection It can take some time (>4 seconds) to elect a new leader, and if the update is attempted immediately it'll fail. Need to either wait for the leader or retry the udpate in case of failure (which is what clients would do) * Wait for leader to be active in testKillTlogReplica * Add hack to prevent unrelated failure * Reduce the time wait time for replica state change * A more robust attempt to add replicas in the tests * Wait for replication for 2 times the replication time
This commit is contained in:
parent
21e58bc128
commit
c82aa3e61b
|
@ -39,7 +39,6 @@ import org.apache.http.client.methods.HttpGet;
|
|||
import org.apache.http.client.methods.HttpPost;
|
||||
import org.apache.http.entity.StringEntity;
|
||||
import org.apache.lucene.index.IndexWriter;
|
||||
import org.apache.lucene.util.LuceneTestCase.AwaitsFix;
|
||||
import org.apache.lucene.util.LuceneTestCase.Slow;
|
||||
import org.apache.solr.client.solrj.SolrClient;
|
||||
import org.apache.solr.client.solrj.SolrQuery;
|
||||
|
@ -76,20 +75,20 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@Slow
|
||||
@AwaitsFix(bugUrl = "https://issues.apache.org/jira/browse/SOLR-12313")
|
||||
public class TestTlogReplica extends SolrCloudTestCase {
|
||||
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
|
||||
private String collectionName = null;
|
||||
private final static int REPLICATION_TIMEOUT_SECS = 10;
|
||||
|
||||
|
||||
private String suggestedCollectionName() {
|
||||
return (getTestClass().getSimpleName().replace("Test", "") + "_" + getSaferTestName().split(" ")[0]).replaceAll("(.)(\\p{Upper})", "$1_$2").toLowerCase(Locale.ROOT);
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
public static void setupCluster() throws Exception {
|
||||
System.setProperty("solr.waitToSeeReplicasInStateTimeoutSeconds", "30");
|
||||
configureCluster(2) // 2 + random().nextInt(3)
|
||||
.addConfig("conf", configset("cloud-minimal-inplace-updates"))
|
||||
.configure();
|
||||
|
@ -99,12 +98,12 @@ public class TestTlogReplica extends SolrCloudTestCase {
|
|||
CollectionAdminResponse response = clusterPropRequest.process(cluster.getSolrClient());
|
||||
assertEquals(0, response.getStatus());
|
||||
}
|
||||
|
||||
|
||||
@AfterClass
|
||||
public static void tearDownCluster() {
|
||||
TestInjection.reset();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
|
@ -127,7 +126,7 @@ public class TestTlogReplica extends SolrCloudTestCase {
|
|||
}
|
||||
super.tearDown();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Asserts that Update logs exist for replicas of type {@link org.apache.solr.common.cloud.Replica.Type#NRT}, but not
|
||||
* for replicas of type {@link org.apache.solr.common.cloud.Replica.Type#PULL}
|
||||
|
@ -139,7 +138,7 @@ public class TestTlogReplica extends SolrCloudTestCase {
|
|||
try {
|
||||
core = cluster.getReplicaJetty(r).getCoreContainer().getCore(r.getCoreName());
|
||||
assertNotNull(core);
|
||||
assertTrue("Update log should exist for replicas of type Append",
|
||||
assertTrue("Update log should exist for replicas of type Append",
|
||||
new java.io.File(core.getUlogDir()).exists());
|
||||
} finally {
|
||||
core.close();
|
||||
|
@ -147,99 +146,94 @@ public class TestTlogReplica extends SolrCloudTestCase {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Repeat(iterations=2) // 2 times to make sure cleanup is complete and we can create the same collection
|
||||
// commented out on: 17-Feb-2019 @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // added 09-Aug-2018
|
||||
public void testCreateDelete() throws Exception {
|
||||
try {
|
||||
switch (random().nextInt(3)) {
|
||||
case 0:
|
||||
CollectionAdminRequest.createCollection(collectionName, "conf", 2, 0, 4, 0)
|
||||
.setMaxShardsPerNode(100)
|
||||
.process(cluster.getSolrClient());
|
||||
cluster.waitForActiveCollection(collectionName, 2, 8);
|
||||
break;
|
||||
case 1:
|
||||
// Sometimes don't use SolrJ
|
||||
String url = String.format(Locale.ROOT, "%s/admin/collections?action=CREATE&name=%s&collection.configName=%s&numShards=%s&tlogReplicas=%s&maxShardsPerNode=%s",
|
||||
cluster.getRandomJetty(random()).getBaseUrl(),
|
||||
collectionName, "conf",
|
||||
2, // numShards
|
||||
4, // tlogReplicas
|
||||
100); // maxShardsPerNode
|
||||
HttpGet createCollectionGet = new HttpGet(url);
|
||||
HttpResponse httpResponse = cluster.getSolrClient().getHttpClient().execute(createCollectionGet);
|
||||
assertEquals(200, httpResponse.getStatusLine().getStatusCode());
|
||||
cluster.waitForActiveCollection(collectionName, 2, 8);
|
||||
break;
|
||||
case 2:
|
||||
// Sometimes use V2 API
|
||||
url = cluster.getRandomJetty(random()).getBaseUrl().toString() + "/____v2/c";
|
||||
String requestBody = String.format(Locale.ROOT, "{create:{name:%s, config:%s, numShards:%s, tlogReplicas:%s, maxShardsPerNode:%s}}",
|
||||
collectionName, "conf",
|
||||
2, // numShards
|
||||
4, // tlogReplicas
|
||||
100); // maxShardsPerNode
|
||||
HttpPost createCollectionPost = new HttpPost(url);
|
||||
createCollectionPost.setHeader("Content-type", "application/json");
|
||||
createCollectionPost.setEntity(new StringEntity(requestBody));
|
||||
httpResponse = cluster.getSolrClient().getHttpClient().execute(createCollectionPost);
|
||||
assertEquals(200, httpResponse.getStatusLine().getStatusCode());
|
||||
cluster.waitForActiveCollection(collectionName, 2, 8);
|
||||
break;
|
||||
switch (random().nextInt(3)) {
|
||||
case 0:
|
||||
CollectionAdminRequest.createCollection(collectionName, "conf", 2, 0, 4, 0)
|
||||
.setMaxShardsPerNode(100)
|
||||
.process(cluster.getSolrClient());
|
||||
cluster.waitForActiveCollection(collectionName, 2, 8);
|
||||
break;
|
||||
case 1:
|
||||
// Sometimes don't use SolrJ
|
||||
String url = String.format(Locale.ROOT, "%s/admin/collections?action=CREATE&name=%s&collection.configName=%s&numShards=%s&tlogReplicas=%s&maxShardsPerNode=%s",
|
||||
cluster.getRandomJetty(random()).getBaseUrl(),
|
||||
collectionName, "conf",
|
||||
2, // numShards
|
||||
4, // tlogReplicas
|
||||
100); // maxShardsPerNode
|
||||
HttpGet createCollectionGet = new HttpGet(url);
|
||||
HttpResponse httpResponse = cluster.getSolrClient().getHttpClient().execute(createCollectionGet);
|
||||
assertEquals(200, httpResponse.getStatusLine().getStatusCode());
|
||||
cluster.waitForActiveCollection(collectionName, 2, 8);
|
||||
break;
|
||||
case 2:
|
||||
// Sometimes use V2 API
|
||||
url = cluster.getRandomJetty(random()).getBaseUrl().toString() + "/____v2/c";
|
||||
String requestBody = String.format(Locale.ROOT, "{create:{name:%s, config:%s, numShards:%s, tlogReplicas:%s, maxShardsPerNode:%s}}",
|
||||
collectionName, "conf",
|
||||
2, // numShards
|
||||
4, // tlogReplicas
|
||||
100); // maxShardsPerNode
|
||||
HttpPost createCollectionPost = new HttpPost(url);
|
||||
createCollectionPost.setHeader("Content-type", "application/json");
|
||||
createCollectionPost.setEntity(new StringEntity(requestBody));
|
||||
httpResponse = cluster.getSolrClient().getHttpClient().execute(createCollectionPost);
|
||||
assertEquals(200, httpResponse.getStatusLine().getStatusCode());
|
||||
cluster.waitForActiveCollection(collectionName, 2, 8);
|
||||
break;
|
||||
}
|
||||
|
||||
boolean reloaded = false;
|
||||
while (true) {
|
||||
DocCollection docCollection = getCollectionState(collectionName);
|
||||
assertNotNull(docCollection);
|
||||
assertEquals("Expecting 2 shards",
|
||||
2, docCollection.getSlices().size());
|
||||
assertEquals("Expecting 4 relpicas per shard",
|
||||
8, docCollection.getReplicas().size());
|
||||
assertEquals("Expecting 8 tlog replicas, 4 per shard",
|
||||
8, docCollection.getReplicas(EnumSet.of(Replica.Type.TLOG)).size());
|
||||
assertEquals("Expecting no nrt replicas",
|
||||
0, docCollection.getReplicas(EnumSet.of(Replica.Type.NRT)).size());
|
||||
assertEquals("Expecting no pull replicas",
|
||||
0, docCollection.getReplicas(EnumSet.of(Replica.Type.PULL)).size());
|
||||
for (Slice s:docCollection.getSlices()) {
|
||||
assertTrue(s.getLeader().getType() == Replica.Type.TLOG);
|
||||
List<String> shardElectionNodes = cluster.getZkClient().getChildren(ZkStateReader.getShardLeadersElectPath(collectionName, s.getName()), null, true);
|
||||
assertEquals("Unexpected election nodes for Shard: " + s.getName() + ": " + Arrays.toString(shardElectionNodes.toArray()),
|
||||
4, shardElectionNodes.size());
|
||||
}
|
||||
|
||||
boolean reloaded = false;
|
||||
while (true) {
|
||||
DocCollection docCollection = getCollectionState(collectionName);
|
||||
assertNotNull(docCollection);
|
||||
assertEquals("Expecting 2 shards",
|
||||
2, docCollection.getSlices().size());
|
||||
assertEquals("Expecting 4 relpicas per shard",
|
||||
8, docCollection.getReplicas().size());
|
||||
assertEquals("Expecting 8 tlog replicas, 4 per shard",
|
||||
8, docCollection.getReplicas(EnumSet.of(Replica.Type.TLOG)).size());
|
||||
assertEquals("Expecting no nrt replicas",
|
||||
0, docCollection.getReplicas(EnumSet.of(Replica.Type.NRT)).size());
|
||||
assertEquals("Expecting no pull replicas",
|
||||
0, docCollection.getReplicas(EnumSet.of(Replica.Type.PULL)).size());
|
||||
for (Slice s:docCollection.getSlices()) {
|
||||
assertTrue(s.getLeader().getType() == Replica.Type.TLOG);
|
||||
List<String> shardElectionNodes = cluster.getZkClient().getChildren(ZkStateReader.getShardLeadersElectPath(collectionName, s.getName()), null, true);
|
||||
assertEquals("Unexpected election nodes for Shard: " + s.getName() + ": " + Arrays.toString(shardElectionNodes.toArray()),
|
||||
4, shardElectionNodes.size());
|
||||
}
|
||||
assertUlogPresence(docCollection);
|
||||
if (reloaded) {
|
||||
break;
|
||||
} else {
|
||||
// reload
|
||||
CollectionAdminResponse response = CollectionAdminRequest.reloadCollection(collectionName)
|
||||
.process(cluster.getSolrClient());
|
||||
assertEquals(0, response.getStatus());
|
||||
waitForState("failed waiting for active colletion", collectionName, clusterShape(2, 8));
|
||||
reloaded = true;
|
||||
}
|
||||
assertUlogPresence(docCollection);
|
||||
if (reloaded) {
|
||||
break;
|
||||
} else {
|
||||
// reload
|
||||
CollectionAdminResponse response = CollectionAdminRequest.reloadCollection(collectionName)
|
||||
.process(cluster.getSolrClient());
|
||||
assertEquals(0, response.getStatus());
|
||||
waitForState("failed waiting for active colletion", collectionName, clusterShape(2, 8));
|
||||
reloaded = true;
|
||||
}
|
||||
} finally {
|
||||
zkClient().printLayoutToStdOut();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testAddDocs() throws Exception {
|
||||
int numTlogReplicas = 1 + random().nextInt(3);
|
||||
DocCollection docCollection = createAndWaitForCollection(1, 0, numTlogReplicas, 0);
|
||||
assertEquals(1, docCollection.getSlices().size());
|
||||
|
||||
|
||||
cluster.getSolrClient().add(collectionName, new SolrInputDocument("id", "1", "foo", "bar"));
|
||||
cluster.getSolrClient().commit(collectionName);
|
||||
|
||||
|
||||
Slice s = docCollection.getSlices().iterator().next();
|
||||
try (HttpSolrClient leaderClient = getHttpSolrClient(s.getLeader().getCoreUrl())) {
|
||||
assertEquals(1, leaderClient.query(new SolrQuery("*:*")).getResults().getNumFound());
|
||||
}
|
||||
|
||||
|
||||
TimeOut t = new TimeOut(REPLICATION_TIMEOUT_SECS, TimeUnit.SECONDS, TimeSource.NANO_TIME);
|
||||
for (Replica r:s.getReplicas(EnumSet.of(Replica.Type.TLOG))) {
|
||||
//TODO: assert replication < REPLICATION_TIMEOUT_SECS
|
||||
|
@ -253,7 +247,7 @@ public class TestTlogReplica extends SolrCloudTestCase {
|
|||
"qt", "/admin/plugins",
|
||||
"stats", "true");
|
||||
QueryResponse statsResponse = tlogReplicaClient.query(req);
|
||||
assertEquals("Append replicas should recive all updates. Replica: " + r + ", response: " + statsResponse,
|
||||
assertEquals("Append replicas should recive all updates. Replica: " + r + ", response: " + statsResponse,
|
||||
1L, ((Map<String, Object>)((NamedList<Object>)statsResponse.getResponse()).findRecursive("plugins", "UPDATE", "updateHandler", "stats")).get("UPDATE.updateHandler.cumulativeAdds.count"));
|
||||
break;
|
||||
} catch (AssertionError e) {
|
||||
|
@ -268,27 +262,27 @@ public class TestTlogReplica extends SolrCloudTestCase {
|
|||
}
|
||||
assertUlogPresence(docCollection);
|
||||
}
|
||||
|
||||
|
||||
public void testAddRemoveTlogReplica() throws Exception {
|
||||
DocCollection docCollection = createAndWaitForCollection(2, 0, 1, 0);
|
||||
assertEquals(2, docCollection.getSlices().size());
|
||||
|
||||
|
||||
addReplicaToShard("shard1", Replica.Type.TLOG);
|
||||
docCollection = assertNumberOfReplicas(0, 3, 0, true, false);
|
||||
addReplicaToShard("shard2", Replica.Type.TLOG);
|
||||
docCollection = assertNumberOfReplicas(0, 4, 0, true, false);
|
||||
|
||||
|
||||
waitForState("Expecting collection to have 2 shards and 2 replica each", collectionName, clusterShape(2, 4));
|
||||
|
||||
|
||||
//Delete tlog replica from shard1
|
||||
CollectionAdminRequest.deleteReplica(
|
||||
collectionName,
|
||||
"shard1",
|
||||
collectionName,
|
||||
"shard1",
|
||||
docCollection.getSlice("shard1").getReplicas(EnumSet.of(Replica.Type.TLOG)).get(0).getName())
|
||||
.process(cluster.getSolrClient());
|
||||
assertNumberOfReplicas(0, 3, 0, true, true);
|
||||
}
|
||||
|
||||
|
||||
private void addReplicaToShard(String shardName, Replica.Type type) throws ClientProtocolException, IOException, SolrServerException {
|
||||
switch (random().nextInt(3)) {
|
||||
case 0: // Add replica with SolrJ
|
||||
|
@ -296,8 +290,8 @@ public class TestTlogReplica extends SolrCloudTestCase {
|
|||
assertEquals("Unexpected response status: " + response.getStatus(), 0, response.getStatus());
|
||||
break;
|
||||
case 1: // Add replica with V1 API
|
||||
String url = String.format(Locale.ROOT, "%s/admin/collections?action=ADDREPLICA&collection=%s&shard=%s&type=%s",
|
||||
cluster.getRandomJetty(random()).getBaseUrl(),
|
||||
String url = String.format(Locale.ROOT, "%s/admin/collections?action=ADDREPLICA&collection=%s&shard=%s&type=%s",
|
||||
cluster.getRandomJetty(random()).getBaseUrl(),
|
||||
collectionName,
|
||||
shardName,
|
||||
type);
|
||||
|
@ -306,10 +300,10 @@ public class TestTlogReplica extends SolrCloudTestCase {
|
|||
assertEquals(200, httpResponse.getStatusLine().getStatusCode());
|
||||
break;
|
||||
case 2:// Add replica with V2 API
|
||||
url = String.format(Locale.ROOT, "%s/____v2/c/%s/shards",
|
||||
cluster.getRandomJetty(random()).getBaseUrl(),
|
||||
url = String.format(Locale.ROOT, "%s/____v2/c/%s/shards",
|
||||
cluster.getRandomJetty(random()).getBaseUrl(),
|
||||
collectionName);
|
||||
String requestBody = String.format(Locale.ROOT, "{add-replica:{shard:%s, type:%s}}",
|
||||
String requestBody = String.format(Locale.ROOT, "{add-replica:{shard:%s, type:%s}}",
|
||||
shardName,
|
||||
type);
|
||||
HttpPost addReplicaPost = new HttpPost(url);
|
||||
|
@ -320,15 +314,15 @@ public class TestTlogReplica extends SolrCloudTestCase {
|
|||
break;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public void testRemoveLeader() throws Exception {
|
||||
doReplaceLeader(true);
|
||||
}
|
||||
|
||||
|
||||
public void testKillLeader() throws Exception {
|
||||
doReplaceLeader(false);
|
||||
}
|
||||
|
||||
|
||||
public void testRealTimeGet() throws SolrServerException, IOException, KeeperException, InterruptedException {
|
||||
// should be redirected to Replica.Type.REALTIME
|
||||
int numReplicas = random().nextBoolean()?1:2;
|
||||
|
@ -373,13 +367,13 @@ public class TestTlogReplica extends SolrCloudTestCase {
|
|||
id++;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* validate leader election and that replication still happens on a new leader
|
||||
*/
|
||||
private void doReplaceLeader(boolean removeReplica) throws Exception {
|
||||
DocCollection docCollection = createAndWaitForCollection(1, 0, 2, 0);
|
||||
|
||||
|
||||
// Add a document and commit
|
||||
cluster.getSolrClient().add(collectionName, new SolrInputDocument("id", "1", "foo", "bar"));
|
||||
cluster.getSolrClient().commit(collectionName);
|
||||
|
@ -387,15 +381,15 @@ public class TestTlogReplica extends SolrCloudTestCase {
|
|||
try (HttpSolrClient leaderClient = getHttpSolrClient(s.getLeader().getCoreUrl())) {
|
||||
assertEquals(1, leaderClient.query(new SolrQuery("*:*")).getResults().getNumFound());
|
||||
}
|
||||
|
||||
|
||||
waitForNumDocsInAllReplicas(1, docCollection.getReplicas(EnumSet.of(Replica.Type.TLOG)), REPLICATION_TIMEOUT_SECS);
|
||||
|
||||
|
||||
// Delete leader replica from shard1
|
||||
JettySolrRunner leaderJetty = null;
|
||||
if (removeReplica) {
|
||||
CollectionAdminRequest.deleteReplica(
|
||||
collectionName,
|
||||
"shard1",
|
||||
collectionName,
|
||||
"shard1",
|
||||
s.getLeader().getName())
|
||||
.process(cluster.getSolrClient());
|
||||
} else {
|
||||
|
@ -403,32 +397,24 @@ public class TestTlogReplica extends SolrCloudTestCase {
|
|||
leaderJetty.stop();
|
||||
waitForState("Leader replica not removed", collectionName, clusterShape(1, 1));
|
||||
// Wait for cluster state to be updated
|
||||
waitForState("Replica state not updated in cluster state",
|
||||
waitForState("Replica state not updated in cluster state",
|
||||
collectionName, clusterStateReflectsActiveAndDownReplicas());
|
||||
}
|
||||
docCollection = assertNumberOfReplicas(0, 1, 0, true, true);
|
||||
|
||||
|
||||
// Wait until a new leader is elected
|
||||
TimeOut t = new TimeOut(30, TimeUnit.SECONDS, TimeSource.NANO_TIME);
|
||||
while (!t.hasTimedOut()) {
|
||||
docCollection = getCollectionState(collectionName);
|
||||
Replica leader = docCollection.getSlice("shard1").getLeader();
|
||||
if (leader != null && leader.isActive(cluster.getSolrClient().getZkStateReader().getClusterState().getLiveNodes())) {
|
||||
break;
|
||||
}
|
||||
Thread.sleep(500);
|
||||
}
|
||||
assertFalse("Timeout waiting for a new leader to be elected", t.hasTimedOut());
|
||||
waitForLeaderChange(leaderJetty, "shard1");
|
||||
|
||||
// There is a new leader, I should be able to add and commit
|
||||
cluster.getSolrClient().add(collectionName, new SolrInputDocument("id", "2", "foo", "zoo"));
|
||||
cluster.getSolrClient().commit(collectionName);
|
||||
|
||||
|
||||
// Queries should still work
|
||||
waitForNumDocsInAllReplicas(2, docCollection.getReplicas(EnumSet.of(Replica.Type.TLOG)), REPLICATION_TIMEOUT_SECS);
|
||||
// Start back the node
|
||||
if (removeReplica) {
|
||||
CollectionAdminRequest.addReplicaToShard(collectionName, "shard1", Replica.Type.TLOG).process(cluster.getSolrClient());
|
||||
addReplicaWithRetries();
|
||||
|
||||
} else {
|
||||
leaderJetty.start();
|
||||
}
|
||||
|
@ -436,35 +422,51 @@ public class TestTlogReplica extends SolrCloudTestCase {
|
|||
// added replica should replicate from the leader
|
||||
waitForNumDocsInAllReplicas(2, docCollection.getReplicas(EnumSet.of(Replica.Type.TLOG)), REPLICATION_TIMEOUT_SECS);
|
||||
}
|
||||
|
||||
|
||||
private void addReplicaWithRetries() throws SolrServerException, IOException {
|
||||
int maxAttempts = 3;
|
||||
for (int i = 0; i < maxAttempts ; i++) {
|
||||
try {
|
||||
CollectionAdminResponse respone = CollectionAdminRequest.addReplicaToShard(collectionName, "shard1", Replica.Type.TLOG).process(cluster.getSolrClient());
|
||||
// This is an unfortunate hack. There are cases where the ADDREPLICA fails, will create a Jira to address that separately. for now, we'll retry
|
||||
if (respone.isSuccess()) {
|
||||
break;
|
||||
}
|
||||
log.error("Unsuccessful atempt to add replica. Attempt: %d/%d", i, maxAttempts);
|
||||
} catch (SolrException e) {
|
||||
log.error("Exception while adding replica. Attempt: " + i + "/" + maxAttempts, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void testKillTlogReplica() throws Exception {
|
||||
DocCollection docCollection = createAndWaitForCollection(1, 0, 2, 0);
|
||||
|
||||
|
||||
waitForNumDocsInAllActiveReplicas(0);
|
||||
cluster.getSolrClient().add(collectionName, new SolrInputDocument("id", "1", "foo", "bar"));
|
||||
cluster.getSolrClient().commit(collectionName);
|
||||
waitForNumDocsInAllActiveReplicas(1);
|
||||
|
||||
|
||||
JettySolrRunner pullReplicaJetty = cluster.getReplicaJetty(docCollection.getSlice("shard1").getReplicas(EnumSet.of(Replica.Type.TLOG)).get(0));
|
||||
pullReplicaJetty.stop();
|
||||
waitForState("Replica not removed", collectionName, activeReplicaCount(0, 1, 0));
|
||||
waitForLeaderChange(pullReplicaJetty, "shard1");
|
||||
// // Also wait for the replica to be placed in state="down"
|
||||
// waitForState("Didn't update state", collectionName, clusterStateReflectsActiveAndDownReplicas());
|
||||
|
||||
|
||||
cluster.getSolrClient().add(collectionName, new SolrInputDocument("id", "2", "foo", "bar"));
|
||||
cluster.getSolrClient().commit(collectionName);
|
||||
waitForNumDocsInAllActiveReplicas(2);
|
||||
|
||||
|
||||
pullReplicaJetty.start();
|
||||
waitForState("Replica not added", collectionName, activeReplicaCount(0, 2, 0));
|
||||
waitForNumDocsInAllActiveReplicas(2);
|
||||
}
|
||||
|
||||
@Test
|
||||
// Removed BadApple on 2018-05-21
|
||||
public void testOnlyLeaderIndexes() throws Exception {
|
||||
createAndWaitForCollection(1, 0, 2, 0);
|
||||
|
||||
|
||||
CloudSolrClient cloudClient = cluster.getSolrClient();
|
||||
new UpdateRequest()
|
||||
.add(sdoc("id", "1"))
|
||||
|
@ -525,13 +527,13 @@ public class TestTlogReplica extends SolrCloudTestCase {
|
|||
}
|
||||
}
|
||||
checkRTG(120,150, cluster.getJettySolrRunners());
|
||||
waitForReplicasCatchUp(20);
|
||||
waitForReplicasCatchUp(4 * REPLICATION_TIMEOUT_SECS);
|
||||
}
|
||||
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testRecovery() throws Exception {
|
||||
createAndWaitForCollection(1, 0, 2, 0);
|
||||
|
||||
|
||||
CloudSolrClient cloudClient = cluster.getSolrClient();
|
||||
new UpdateRequest()
|
||||
.add(sdoc("id", "3"))
|
||||
|
@ -551,13 +553,12 @@ public class TestTlogReplica extends SolrCloudTestCase {
|
|||
// We skip peerSync, so replica will always trigger commit on leader
|
||||
// We query only the non-leader replicas, since we haven't opened a new searcher on the leader yet
|
||||
waitForNumDocsInAllReplicas(4, getNonLeaderReplias(collectionName), 10); //timeout for stale collection state
|
||||
|
||||
|
||||
// If I add the doc immediately, the leader fails to communicate with the follower with broken pipe.
|
||||
// Options are, wait or retry...
|
||||
for (int i = 0; i < 3; i++) {
|
||||
UpdateRequest ureq = new UpdateRequest().add(sdoc("id", "7"));
|
||||
ureq.setParam("collection", collectionName);
|
||||
ureq.setParam(UpdateRequest.MIN_REPFACT, "2");
|
||||
NamedList<Object> response = cloudClient.request(ureq);
|
||||
if ((Integer)((NamedList<Object>)response.get("responseHeader")).get(UpdateRequest.REPFACT) >= 2) {
|
||||
break;
|
||||
|
@ -595,7 +596,6 @@ public class TestTlogReplica extends SolrCloudTestCase {
|
|||
for (int i = 0; i < 3; i++) {
|
||||
UpdateRequest ureq = new UpdateRequest().add(sdoc("id", "8"));
|
||||
ureq.setParam("collection", collectionName);
|
||||
ureq.setParam(UpdateRequest.MIN_REPFACT, "2");
|
||||
NamedList<Object> response = cloudClient.request(ureq);
|
||||
if ((Integer)((NamedList<Object>)response.get("responseHeader")).get(UpdateRequest.REPFACT) >= 2) {
|
||||
break;
|
||||
|
@ -616,12 +616,12 @@ public class TestTlogReplica extends SolrCloudTestCase {
|
|||
iwRef.decref();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private List<Replica> getNonLeaderReplias(String collectionName) {
|
||||
return getCollectionState(collectionName).getReplicas().stream().filter(
|
||||
(r)-> !r.getBool("leader", false)).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
|
||||
public void testDeleteById() throws Exception{
|
||||
createAndWaitForCollection(1,0,2,0);
|
||||
CloudSolrClient cloudClient = cluster.getSolrClient();
|
||||
|
@ -644,7 +644,7 @@ public class TestTlogReplica extends SolrCloudTestCase {
|
|||
}
|
||||
assertFalse("Doc1 is deleted but it's still exist", successs);
|
||||
}
|
||||
|
||||
|
||||
public void testBasicLeaderElection() throws Exception {
|
||||
createAndWaitForCollection(1,0,2,0);
|
||||
CloudSolrClient cloudClient = cluster.getSolrClient();
|
||||
|
@ -658,7 +658,11 @@ public class TestTlogReplica extends SolrCloudTestCase {
|
|||
JettySolrRunner oldLeaderJetty = getSolrRunner(true).get(0);
|
||||
oldLeaderJetty.stop();
|
||||
waitForState("Replica not removed", collectionName, activeReplicaCount(0, 1, 0));
|
||||
new UpdateRequest()
|
||||
|
||||
// Even after the replica is gone, a leader may not be elected yet. Wait for it.
|
||||
waitForLeaderChange(oldLeaderJetty, "shard1");
|
||||
|
||||
new UpdateRequest()
|
||||
.add(sdoc("id", "3"))
|
||||
.add(sdoc("id", "4"))
|
||||
.process(cloudClient, collectionName);
|
||||
|
@ -669,7 +673,19 @@ public class TestTlogReplica extends SolrCloudTestCase {
|
|||
.commit(cloudClient, collectionName);
|
||||
waitForNumDocsInAllActiveReplicas(4, 0);
|
||||
}
|
||||
|
||||
|
||||
private void waitForLeaderChange(JettySolrRunner oldLeaderJetty, String shardName) {
|
||||
waitForState("Expect new leader", collectionName,
|
||||
(liveNodes, collectionState) -> {
|
||||
Replica leader = collectionState.getLeader(shardName);
|
||||
if (leader == null || !leader.isActive(cluster.getSolrClient().getZkStateReader().getClusterState().getLiveNodes())) {
|
||||
return false;
|
||||
}
|
||||
return oldLeaderJetty == null || !leader.getNodeName().equals(oldLeaderJetty.getNodeName());
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
public void testOutOfOrderDBQWithInPlaceUpdates() throws Exception {
|
||||
createAndWaitForCollection(1,0,2,0);
|
||||
assertFalse(getSolrCore(true).get(0).getLatestSchema().getField("inplace_updatable_int").indexed());
|
||||
|
@ -687,23 +703,16 @@ public class TestTlogReplica extends SolrCloudTestCase {
|
|||
}
|
||||
}
|
||||
JettySolrRunner oldLeaderJetty = getSolrRunner(true).get(0);
|
||||
String oldLeaderNodeName = oldLeaderJetty.getNodeName();
|
||||
oldLeaderJetty.stop();
|
||||
waitForState("Replica not removed", collectionName, activeReplicaCount(0, 1, 0));
|
||||
waitForState("Expect new leader", collectionName,
|
||||
(liveNodes, collectionState) -> {
|
||||
Replica leader = collectionState.getLeader("shard1");
|
||||
if (leader == null) return false;
|
||||
return !leader.getNodeName().equals(oldLeaderNodeName);
|
||||
}
|
||||
);
|
||||
waitForLeaderChange(oldLeaderJetty, "shard1");
|
||||
oldLeaderJetty.start();
|
||||
waitForState("Replica not added", collectionName, activeReplicaCount(0, 2, 0));
|
||||
checkRTG(1,1, cluster.getJettySolrRunners());
|
||||
SolrDocument doc = cluster.getSolrClient().getById(collectionName,"1");
|
||||
assertNotNull(doc.get("title_s"));
|
||||
}
|
||||
|
||||
|
||||
private UpdateRequest simulatedUpdateRequest(Long prevVersion, Object... fields) throws SolrServerException, IOException {
|
||||
SolrInputDocument doc = sdoc(fields);
|
||||
|
||||
|
@ -747,20 +756,20 @@ public class TestTlogReplica extends SolrCloudTestCase {
|
|||
collectionName, clusterShape(numShards, numShards * numReplicasPerShard));
|
||||
return assertNumberOfReplicas(numNrtReplicas*numShards, numTlogReplicas*numShards, numPullReplicas*numShards, false, true);
|
||||
}
|
||||
|
||||
|
||||
private void waitForNumDocsInAllActiveReplicas(int numDocs) throws IOException, SolrServerException, InterruptedException {
|
||||
waitForNumDocsInAllActiveReplicas(numDocs, REPLICATION_TIMEOUT_SECS);
|
||||
}
|
||||
|
||||
|
||||
private void waitForNumDocsInAllActiveReplicas(int numDocs, int timeout) throws IOException, SolrServerException, InterruptedException {
|
||||
DocCollection docCollection = getCollectionState(collectionName);
|
||||
waitForNumDocsInAllReplicas(numDocs, docCollection.getReplicas().stream().filter(r -> r.getState() == Replica.State.ACTIVE).collect(Collectors.toList()), timeout);
|
||||
}
|
||||
|
||||
|
||||
private void waitForNumDocsInAllReplicas(int numDocs, Collection<Replica> replicas, int timeout) throws IOException, SolrServerException, InterruptedException {
|
||||
waitForNumDocsInAllReplicas(numDocs, replicas, "*:*", timeout);
|
||||
}
|
||||
|
||||
|
||||
private void waitForNumDocsInAllReplicas(int numDocs, Collection<Replica> replicas, String query, int timeout) throws IOException, SolrServerException, InterruptedException {
|
||||
TimeOut t = new TimeOut(timeout, TimeUnit.SECONDS, TimeSource.NANO_TIME);
|
||||
for (Replica r:replicas) {
|
||||
|
@ -784,7 +793,7 @@ public class TestTlogReplica extends SolrCloudTestCase {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private void waitForDeletion(String collection) throws InterruptedException, KeeperException {
|
||||
TimeOut t = new TimeOut(10, TimeUnit.SECONDS, TimeSource.NANO_TIME);
|
||||
while (cluster.getSolrClient().getZkStateReader().getClusterState().hasCollection(collection)) {
|
||||
|
@ -797,25 +806,25 @@ public class TestTlogReplica extends SolrCloudTestCase {
|
|||
} catch(SolrException e) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private DocCollection assertNumberOfReplicas(int numNrtReplicas, int numTlogReplicas, int numPullReplicas, boolean updateCollection, boolean activeOnly) throws KeeperException, InterruptedException {
|
||||
if (updateCollection) {
|
||||
cluster.getSolrClient().getZkStateReader().forceUpdateCollection(collectionName);
|
||||
}
|
||||
DocCollection docCollection = getCollectionState(collectionName);
|
||||
assertNotNull(docCollection);
|
||||
assertEquals("Unexpected number of nrt replicas: " + docCollection, numNrtReplicas,
|
||||
assertEquals("Unexpected number of nrt replicas: " + docCollection, numNrtReplicas,
|
||||
docCollection.getReplicas(EnumSet.of(Replica.Type.NRT)).stream().filter(r->!activeOnly || r.getState() == Replica.State.ACTIVE).count());
|
||||
assertEquals("Unexpected number of pull replicas: " + docCollection, numPullReplicas,
|
||||
assertEquals("Unexpected number of pull replicas: " + docCollection, numPullReplicas,
|
||||
docCollection.getReplicas(EnumSet.of(Replica.Type.PULL)).stream().filter(r->!activeOnly || r.getState() == Replica.State.ACTIVE).count());
|
||||
assertEquals("Unexpected number of tlog replicas: " + docCollection, numTlogReplicas,
|
||||
assertEquals("Unexpected number of tlog replicas: " + docCollection, numTlogReplicas,
|
||||
docCollection.getReplicas(EnumSet.of(Replica.Type.TLOG)).stream().filter(r->!activeOnly || r.getState() == Replica.State.ACTIVE).count());
|
||||
return docCollection;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* passes only if all replicas are active or down, and the "liveNodes" reflect the same status
|
||||
*/
|
||||
|
@ -835,8 +844,8 @@ public class TestTlogReplica extends SolrCloudTestCase {
|
|||
return true;
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
private CollectionStatePredicate activeReplicaCount(int numNrtReplicas, int numTlogReplicas, int numPullReplicas) {
|
||||
return (liveNodes, collectionState) -> {
|
||||
int nrtFound = 0, tlogFound = 0, pullFound = 0;
|
||||
|
@ -863,7 +872,7 @@ public class TestTlogReplica extends SolrCloudTestCase {
|
|||
return numNrtReplicas == nrtFound && numTlogReplicas == tlogFound && numPullReplicas == pullFound;
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
private List<SolrCore> getSolrCore(boolean isLeader) {
|
||||
List<SolrCore> rs = new ArrayList<>();
|
||||
|
||||
|
@ -885,7 +894,7 @@ public class TestTlogReplica extends SolrCloudTestCase {
|
|||
}
|
||||
return rs;
|
||||
}
|
||||
|
||||
|
||||
private void checkRTG(int from, int to, List<JettySolrRunner> solrRunners) throws Exception{
|
||||
for (JettySolrRunner solrRunner: solrRunners) {
|
||||
try (SolrClient client = solrRunner.newClient()) {
|
||||
|
@ -900,7 +909,7 @@ public class TestTlogReplica extends SolrCloudTestCase {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private List<JettySolrRunner> getSolrRunner(boolean isLeader) {
|
||||
List<JettySolrRunner> rs = new ArrayList<>();
|
||||
CloudSolrClient cloudClient = cluster.getSolrClient();
|
||||
|
@ -920,7 +929,7 @@ public class TestTlogReplica extends SolrCloudTestCase {
|
|||
}
|
||||
return rs;
|
||||
}
|
||||
|
||||
|
||||
private void waitForReplicasCatchUp(int numTry) throws IOException, InterruptedException {
|
||||
String leaderTimeCommit = getSolrCore(true).get(0).getDeletionPolicy().getLatestCommit().getUserData().get(SolrIndexWriter.COMMIT_TIME_MSEC_KEY);
|
||||
if (leaderTimeCommit == null) return;
|
||||
|
|
Loading…
Reference in New Issue